From c36b9c476feeb6a866de4f52471c1533f0c23ee0 Mon Sep 17 00:00:00 2001 From: ForteScarlet Date: Wed, 7 Feb 2024 20:55:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E6=8C=81=E7=BB=AD?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E7=9A=84=E5=9F=BA=E6=9C=AC=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../simbot/common/collection/maps.native.kt | 4 +- .../AbstractContinuousSessionContext.kt | 50 ++++--- .../continuous/session/ContinuousSession.kt | 78 ++++++++++- .../session/ContinuousSessionContext.kt | 129 ++++++++++++++++-- .../session/EventContinuousSessionContext.kt | 102 ++++++++++++++ .../continuous/session/exceptions.kt | 10 ++ .../kotlin/ContinuousSessionTest.kt | 71 ++++++---- 7 files changed, 379 insertions(+), 65 deletions(-) create mode 100644 simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/EventContinuousSessionContext.kt diff --git a/simbot-commons/simbot-common-collection/src/nativeMain/kotlin/love/forte/simbot/common/collection/maps.native.kt b/simbot-commons/simbot-common-collection/src/nativeMain/kotlin/love/forte/simbot/common/collection/maps.native.kt index 59d7cb7d1..b1d481fcd 100644 --- a/simbot-commons/simbot-common-collection/src/nativeMain/kotlin/love/forte/simbot/common/collection/maps.native.kt +++ b/simbot-commons/simbot-common-collection/src/nativeMain/kotlin/love/forte/simbot/common/collection/maps.native.kt @@ -129,7 +129,7 @@ public actual inline fun MutableMap.removeValue(key: K, crossinline * */ public actual fun concurrentMutableMap(): MutableMap = - SynchronizedMutableMap(mutableMapOf()) + SynchronizedMutableMap(mutableMapOf()) // AtomicCopyOnWriteConcurrentMutableMap(emptyMap()) @PublishedApi @@ -208,7 +208,7 @@ private class SynchronizedMutableMap(private val map: MutableMap) : } -@Deprecated("似乎有Bug") +@Deprecated("似乎有Bug?") private class AtomicCopyOnWriteConcurrentMutableMap(initMap: Map) : MutableMap, MutableMapOperators { private open class MapBox(val map: Map) diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/AbstractContinuousSessionContext.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/AbstractContinuousSessionContext.kt index 8335b2b3f..e72ae9ddf 100644 --- a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/AbstractContinuousSessionContext.kt +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/AbstractContinuousSessionContext.kt @@ -21,6 +21,9 @@ * */ +@file:JvmName("ContinuousSessionContexts") +@file:JvmMultifileClass + package love.forte.simbot.extension.continuous.session import kotlinx.coroutines.* @@ -36,6 +39,8 @@ import love.forte.simbot.extension.continuous.session.ContinuousSessionContext.C import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +import kotlin.jvm.JvmMultifileClass +import kotlin.jvm.JvmName /** @@ -46,6 +51,8 @@ public abstract class AbstractContinuousSessionContext(coroutineContext: C ContinuousSessionContext { protected val sessions: MutableMap> = concurrentMutableMap() protected val launchScope: CoroutineScope = CoroutineScope(coroutineContext) + protected val subScope: CoroutineScope = + if (coroutineContext[Job] == null) launchScope else CoroutineScope(coroutineContext.minusKey(Job)) protected abstract fun computeSession(key: Any, inSession: InSession): ContinuousSessionProvider @@ -56,49 +63,55 @@ public abstract class AbstractContinuousSessionContext(coroutineContext: C ): ContinuousSessionProvider { return when (strategy) { FAILURE -> { - sessions.computeValue(key) { _, old -> + sessions.computeValue(key) { k, old -> if (old != null) throw IllegalStateException("Session with key $key already exists") - computeSession(key, inSession) + computeSession(k, inSession) }!! } REPLACE -> { - sessions.computeValue(key) { _, old -> - old?.cancel(/* TODO REPLACED EXCEPTION? */) - computeSession(key, inSession) + sessions.computeValue(key) { k, old -> + old?.cancel(ReplacedBecauseOfConflictSessionKeyException("conflict key $k")) + computeSession(k, inSession) }!! } - OLD -> { - sessions.computeValueIfAbsent(key) { computeSession(key, inSession) } + EXISTING -> { + sessions.computeValueIfAbsent(key) { k -> computeSession(k, inSession) } } } } + // 是否检测 isActive? + override fun get(key: Any): ContinuousSessionProvider? = sessions[key] + override fun contains(key: Any): Boolean = sessions.containsKey(key) override fun remove(key: Any): ContinuousSessionProvider? = sessions[key] } +/** + * 创建一个 [ContinuousSessionContext] 的基础实现类型。 + */ +@JvmName("createContinuousSessionContext") +public fun ContinuousSessionContext(coroutineContext: CoroutineContext): ContinuousSessionContext = + SimpleContinuousSessionContext(coroutineContext) -internal class SimpleContinuousSessionContext(coroutineContext: CoroutineContext) : +private class SimpleContinuousSessionContext(coroutineContext: CoroutineContext) : AbstractContinuousSessionContext(coroutineContext) { private val parentJob = coroutineContext[Job] - // override fun toString(): String { - // // TODO - // return sessions.toString() - // } - override fun computeSession(key: Any, inSession: InSession): SimpleSessionImpl { val job = Job(parentJob) val channel = Channel>() - val session = SimpleSessionImpl(key, job, channel, launchScope) + val session = SimpleSessionImpl(key, job, channel, subScope) job.invokeOnCompletion { - channel.close(it) sessions.removeValue(key) { session } } + job.invokeOnCompletion { + channel.close(it) + } launchScope.launch { kotlin.runCatching { @@ -114,15 +127,16 @@ internal class SimpleContinuousSessionContext(coroutineContext: CoroutineC } } -internal data class SessionData(val value: T, val continuation: CancellableContinuation) - +private data class SessionData(val value: T, val continuation: CancellableContinuation) -internal class SimpleSessionImpl( +private class SimpleSessionImpl( private val key: Any, private val job: CompletableJob, private val channel: Channel>, private val launchScope: CoroutineScope ) : ContinuousSession { + override val coroutineContext: CoroutineContext + get() = launchScope.coroutineContext override val isActive: Boolean get() = job.isActive diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSession.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSession.kt index 90e9083c9..3a0ad0f02 100644 --- a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSession.kt +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSession.kt @@ -23,58 +23,128 @@ package love.forte.simbot.extension.continuous.session +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import love.forte.simbot.ability.CompletionAware +import love.forte.simbot.ability.OnCompletion +import kotlin.coroutines.CoroutineContext /** + * 一组 `Session` 的元素之一, + * 用来向 [ContinuousSessionReceiver] 推送事件 [T] 并获悉结果 [R]。 + * + * ```kotlin + * val session = context.session(Key()) { + * val next = await { v -> v.toResult() } // 假设 toResult() 将事件转化为结果 + * } ↑ | + * |-- | ---------------------| + * | |-----------| + * ↓ | + * val result = session.push(value) // 此处得到 v.toResult() 的结果 + * ``` * * @author ForteScarlet */ public interface ContinuousSessionProvider : CompletionAware { /** + * 推送一个事件到对应的 [ContinuousSessionReceiver] 中并挂起直到将其 + * [消费][ContinuousSessionReceiver.await] 或被关闭。 + * * @throws SessionPushOnFailureException 如果推送行为本身失败, * 例如session已经结束或关闭。 * @throws SessionAwaitOnFailureException 如果推送行为成功、 * 但是在 [ContinuousSessionReceiver.await] 时出现了异常(例如构造响应结果时出现异常) * @throws */ + // TODO @ST? public suspend fun push(value: T): R + /** + * 关闭对应的 `session`。 + * 关闭后会同时将对应的 `session` 从对应的 [ContinuousSessionContext] 中移除。 + */ public fun cancel(cause: Throwable?) + /** + * 关闭对应的 `session`。 + * 关闭后会同时将对应的 `session` 从对应的 [ContinuousSessionContext] 中移除。 + */ public fun cancel() { cancel(null) } + /** + * 是否处于活跃状态。 + */ public val isActive: Boolean + + /** + * 是否已经完成。 + */ public val isCompleted: Boolean + + /** + * 是否由于 cancel 而完成。 + */ public val isCancelled: Boolean + + /** + * 挂起直到 `session` 完成任务或被关闭。 + */ + // TODO @ST? public suspend fun join() + + /** + * 注册一个当 `session` 完成任务后执行的回调 [handle]。 + * 也可以通过此回调得知被终止时的异常。 + */ + override fun onCompletion(handle: OnCompletion) } /** + * 一组 `Session` 的元素之一, + * 用来在异步中接收 [ContinuousSessionProvider] 推送的事件 [T] + * 并根据此事件为其返回结果 [R]。 + * + * ```kotlin + * val session = context.session(Key()) { + * val next = await { v -> v.toResult() } // 假设 toResult() 将事件转化为结果 + * } ↑ | + * |-- | ---------------------| + * | |-----------| + * ↓ | + * val result = session.push(value) // 此处得到 v.toResult() 的结果 + * ``` * * @author ForteScarlet */ -public interface ContinuousSessionReceiver { +public interface ContinuousSessionReceiver : CoroutineScope { + /** + * [ContinuousSessionReceiver] 作为 [CoroutineScope] 的协程上下文。 + * 其中不会包含 [Job]。 + */ + override val coroutineContext: CoroutineContext /** * 等待 [ContinuousSessionProvider] 的下一次 [推送][ContinuousSessionProvider.push]。 * - * 在 [await] 的实现中, * 如果在 [await] 过程中出现异常,会在抛出此异常前, * 将此异常使用 [SessionAwaitOnFailureException] * 包装并恢复(`resume`)给 [ContinuousSessionProvider.push]。 * - * * @param result * */ + // TODO @ST? public suspend fun await(result: (T) -> R): T // TODO await 如何延迟响应? } - +/** + * 组合 [ContinuousSessionProvider] 和 [ContinuousSessionReceiver] + * 的 `session` 类型。 + */ public interface ContinuousSession : ContinuousSessionProvider, ContinuousSessionReceiver diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSessionContext.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSessionContext.kt index 6f1c61388..4c58e85dd 100644 --- a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSessionContext.kt +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/ContinuousSessionContext.kt @@ -21,45 +21,146 @@ * */ -package love.forte.simbot.extension.continuous.session +@file:JvmName("ContinuousSessionContexts") +@file:JvmMultifileClass -import love.forte.simbot.event.Event -import love.forte.simbot.event.EventResult +package love.forte.simbot.extension.continuous.session +import kotlin.jvm.JvmMultifileClass +import kotlin.jvm.JvmName +/** + * 使用于 [ContinuousSessionContext.session] 中的 `receiver` 逻辑函数。 + */ public fun interface InSession { public suspend fun ContinuousSessionReceiver.invoke() } /** + * 持续会话(`continuous session`)管理器, + * 用于承载一组 [ContinuousSessionProvider] 和 [ContinuousSessionReceiver] + * 的上下文。用于构建与管理 `ContinuousSession`。 + * + * ## 持续会话 + * + * **持续会话(`continuous session`)** 是一种用于解决在同一组逻辑中连续处理多个'事件 [T]'、并响应 '结果 [R]' 的解决方案。 + * [ContinuousSessionContext] 本身可灵活定制这个所谓 '事件' 的类型, + * 其主要应用场景为借助子类型 [EventContinuousSessionContext] 在 simbot 的事件调度中使用。 + * + * 假如,有一个事件类型为 [Int]、响应结果为 [String] 的一组持续会话: + * + * ```kotlin + * // 假设这个 session context 的'事件'类型为 Int, '结果'类型为 String + * val context = ContinuousSessionContext = ... + * val session = context.session(Key()) { // this: ContinuousSessionReceiver + * // receiver 逻辑在异步中,等待外界的事件推送 + * val next = await { it -> it.toString() } + * } ↑ | + * |-------------| | + * | | + * |--------------- | --------| + * | | + * ↓ | + * val result = session.push(1) // 推送 '事件', 得到 '结果' + * + * session.join() // session 内逻辑结束后便会正常终止 + * assertTrue(session.isCompleted) + * ``` + * + * 在 simbot 的事件调度中的简单应用: + * + * ```kotlin + * suspend fun handle(handleEvent: Event, sessionContext: EventContinuousSessionContext): EventResult { + * // event: 接收到的事件 + * // sessionContext: 假设它是在 application 的 plugins 中获取到的. 可参见 `EventContinuousSessionContext` 的文档说明。 + * + * val key = computeKey(handleEvent) // 根据 handleEvent, 分配一个与它的唯一会话对应的 key. + * val session = context.session(key, EXISTING) { + * // this: ContinuousSessionReceiver + * // receiver 逻辑在异步中,等待外界的事件推送 + * // 此处的逻辑: + * // 如果收到的事件 event 经过 check 的判断后符合要求, + * // 则返回 EventResult.empty(isTruncated = true), 代表此会话已经截取此事件, + * // 不要让事件再向后续的其他处理器传递; + * // 否则(即不符合你的业务逻辑判断的条件)则返回一个 EventResult.invalid(), 代表无效的结果。 + * val next = await { event -> + * ↑ if (check(event)) EventResult.empty(isTruncated = true) + * | else EventResult.invalid() + * | } + * } | \-----------------------------------------------------------/ + * | | + * |-------------| | + * | | + * |--------------- | --------| + * ↓ | + * return session.push(handleEvent) // 推送 '事件', 得到 '结果' + * + * // 直接返回这个结果 + * return result + * + * } + * ``` + * + * + * @see EventContinuousSessionContext * * @author ForteScarlet */ public interface ContinuousSessionContext { /** - * 创建,冲突 + * 尝试创建一组 `ContinuousSession` 并返回其中的 [ContinuousSessionProvider]。 + * 在出现 [key] 冲突时基于 [strategy] 策略处理冲突。 + * + * @param key session 会话的标识。[key] 的类型应当是一个可以保证能够作为一个 hash key 的类型, + * 例如基础数据类型(例如 [Int]、[String])、数据类类型(data class)、object 类型等。 + * @param strategy 当 [key] 出现冲突时的处理策略 + * @param inSession 在**异步**中进行 + * @throws ConflictSessionKeyException 如果 [strategy] 为 [ConflictStrategy.FAILURE] 并且出现了冲突 */ - public fun session(key: Any, strategy: ConflictStrategy = ConflictStrategy.FAILURE, inSession: InSession): ContinuousSessionProvider + public fun session( + key: Any, + strategy: ConflictStrategy = ConflictStrategy.FAILURE, + inSession: InSession + ): ContinuousSessionProvider + /** + * 尝试创建一组 `ContinuousSession`, 并在出现 [key] 冲突时基于 [] + */ + public fun session( + key: Any, + inSession: InSession + ): ContinuousSessionProvider = session(key, ConflictStrategy.FAILURE, inSession) + + + /** + * 根据 [key] 获取指定的 [ContinuousSessionProvider] 并在找不到时返回 `null`。 + */ public operator fun get(key: Any): ContinuousSessionProvider? + public operator fun contains(key: Any): Boolean + public fun remove(key: Any): ContinuousSessionProvider? /** - * 创建时的冲突策略 + * 创建会话时的冲突策略 */ public enum class ConflictStrategy { - // 报错 + /** + * 如果已经存在相同 `key` 的值,抛出异常 [ConflictSessionKeyException]。 + * + */ FAILURE, - // 关闭旧的 + + /** + * 关闭旧的现存值,并用提供的新值取代。 + */ REPLACE, - // 获取旧的 - OLD // TODO name + + /** + * 直接返回旧的现存值,忽略新值 + */ + EXISTING } } -/** - * 以事件为中心的 [ContinuousSessionContext] 子类型。 - */ -public interface EventContinuousSessionContext : ContinuousSessionContext diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/EventContinuousSessionContext.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/EventContinuousSessionContext.kt new file mode 100644 index 000000000..d8e7738f0 --- /dev/null +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/EventContinuousSessionContext.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2024. ForteScarlet. + * + * Project https://github.com/simple-robot/simpler-robot + * Email ForteScarlet@163.com + * + * This file is part of the Simple Robot Library (Alias: simple-robot, simbot, etc.). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + * + */ + +@file:JvmName("EventContinuousSessionContexts") +@file:JvmMultifileClass + +package love.forte.simbot.extension.continuous.session + +import kotlinx.coroutines.Job +import love.forte.simbot.application.Application +import love.forte.simbot.application.ApplicationConfiguration +import love.forte.simbot.common.coroutines.mergeWith +import love.forte.simbot.common.function.ConfigurerFunction +import love.forte.simbot.common.function.invokeWith +import love.forte.simbot.event.Event +import love.forte.simbot.event.EventResult +import love.forte.simbot.plugin.Plugin +import love.forte.simbot.plugin.PluginConfigureContext +import love.forte.simbot.plugin.PluginFactory +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.jvm.JvmMultifileClass +import kotlin.jvm.JvmName + +/** + * 以事件为中心的 [ContinuousSessionContext] 子类型。 + * + * ## 插件 + * + * [EventContinuousSessionContext] 实现 [Plugin], 可以作为 [Application] 的插件安装使用。 + * + * ```kotlin + * launchSimpleApplication { + * install(EventContinuousSessionContext) { + * // 一些可选的配置... + * } + * } + * ``` + * + * [EventContinuousSessionContext] 暂时**不支持SPI**,它需要用户明确的按需加载。 + * + * ## 持续会话 + * + * 有关持续会话等详细说明参阅 [ContinuousSessionContext] 的文档说明。 + * + * @see ContinuousSessionContext + */ +public interface EventContinuousSessionContext : ContinuousSessionContext, Plugin { + + public companion object Factory : + PluginFactory { + override val key: PluginFactory.Key = object : PluginFactory.Key {} + + override fun create( + context: PluginConfigureContext, + configurer: ConfigurerFunction + ): EventContinuousSessionContext { + val config = EventContinuousSessionContextConfiguration() + configurer.invokeWith(config) + val newCoroutineContext = config.coroutineContext.mergeWith(context.applicationConfiguration.coroutineContext) + + return EventContinuousSessionContextImpl(newCoroutineContext) + } + } +} + +private class EventContinuousSessionContextImpl(coroutineContext: CoroutineContext) : + EventContinuousSessionContext, + ContinuousSessionContext by ContinuousSessionContext(coroutineContext) + +/** + * [EventContinuousSessionContext.Factory] 使用的配置类。 + */ +public class EventContinuousSessionContextConfiguration { + /** + * 用于 [EventContinuousSessionContext] 中的协程上下文。 + * 值来自 [ApplicationConfiguration.coroutineContext],但不包含 Job。 + * 如果配置后 [coroutineContext] 存在 [Job], 则会基于此以及 [ApplicationConfiguration.coroutineContext] + * 合成一个新的 [Job]。 + */ + public var coroutineContext: CoroutineContext = EmptyCoroutineContext +} diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/exceptions.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/exceptions.kt index ca43bcf73..5ccd821b3 100644 --- a/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/exceptions.kt +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonMain/kotlin/love/forte/simbot/extension/continuous/session/exceptions.kt @@ -23,6 +23,16 @@ package love.forte.simbot.extension.continuous.session +/** + * 冲突的 session key. + */ +public class ConflictSessionKeyException(message: String?) : IllegalArgumentException(message) + +/** + * 因出现冲突的 session key 而被替换 + */ +public class ReplacedBecauseOfConflictSessionKeyException(message: String?) : IllegalStateException(message) + /** * 当使用 [ContinuousSessionProvider.push] 推送失败时, * 会将异常包装在 [SessionPushOnFailureException.cause] 中。 diff --git a/simbot-extensions/simbot-extension-continuous-session/src/commonTest/kotlin/ContinuousSessionTest.kt b/simbot-extensions/simbot-extension-continuous-session/src/commonTest/kotlin/ContinuousSessionTest.kt index fc5e89148..2f7ac6190 100644 --- a/simbot-extensions/simbot-extension-continuous-session/src/commonTest/kotlin/ContinuousSessionTest.kt +++ b/simbot-extensions/simbot-extension-continuous-session/src/commonTest/kotlin/ContinuousSessionTest.kt @@ -21,11 +21,16 @@ * */ -import kotlinx.coroutines.* +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeoutOrNull +import love.forte.simbot.extension.continuous.session.ContinuousSessionContext +import love.forte.simbot.extension.continuous.session.ContinuousSessionContext.ConflictStrategy.EXISTING +import love.forte.simbot.extension.continuous.session.InSession import love.forte.simbot.extension.continuous.session.SessionAwaitOnFailureException import love.forte.simbot.extension.continuous.session.SessionPushOnFailureException -import love.forte.simbot.extension.continuous.session.SimpleContinuousSessionContext import kotlin.test.* import kotlin.time.Duration.Companion.milliseconds @@ -59,7 +64,7 @@ class ContinuousSessionTest { val parentJob = Job() coroutineScope { val key = Any() - val context = SimpleContinuousSessionContext(Dispatchers.Default + parentJob) + val context = ContinuousSessionContext(Dispatchers.Default + parentJob) val session = context.session(key) { assertEquals(1, await { it.toString() }.also { println("await: $it") }) assertEquals(2, await { it.toString() }.also { println("await: $it") }) @@ -79,8 +84,7 @@ class ContinuousSessionTest { session.join() assertTrue(session.isCompleted) assertFalse(session.isCancelled) - // TODO error! - // assertNull(context[key]) + assertNull(context[key]) } assertTrue(parentJob.isActive) @@ -91,16 +95,14 @@ class ContinuousSessionTest { val parentJob = Job() coroutineScope { val key = Any() - val context = SimpleContinuousSessionContext(Dispatchers.Default + parentJob) + val context = ContinuousSessionContext(Dispatchers.Default + parentJob) val session = context.session(key) { assertEquals(1, await { it.toString() })// .also { println("await: $it") } val ex = assertFails { await { throw IllegalStateException("error on $it") }// .also { println("await: $it") } } - //println("await ex = $ex (@${ex.hashCode()})") assertIs(ex) - //println("Session done") } // launch { @@ -109,13 +111,11 @@ class ContinuousSessionTest { val ex = assertFails { session.push(2)//.also { println("push 2 result: $it") } } - //println("push ex = $ex (@${ex.hashCode()})") assertIs(ex) session.join() assertTrue(session.isCompleted) assertFalse(session.isCancelled) - // TODO error! - // assertNull(context[key]) + assertNull(context[key]) } assertTrue(parentJob.isActive) @@ -124,44 +124,61 @@ class ContinuousSessionTest { @Test fun sessionAwaitTimeoutTest() = runTest { val parentJob = Job() - val context = SimpleContinuousSessionContext(Dispatchers.Default + parentJob) + val context = ContinuousSessionContext(Dispatchers.Default + parentJob) + + val firstTimeoutJob = Job() coroutineScope { val key = Any() val session = context.session(key) { - val v1 = withContext(Dispatchers.Default) { - withTimeoutOrNull(50.milliseconds) { - await { it.toString() } - } + val v1 = withTimeoutOrNull(50.milliseconds) { + await { it.toString() } } + assertNull(v1, "Expected timeout value to be null, but was: $v1") + firstTimeoutJob.complete() assertEquals(1, await { it.toString() }) } - withContext(Dispatchers.Default) { - delay(100.milliseconds) - } + firstTimeoutJob.join() assertEquals("1", session.push(1)) + session.join() + assertTrue(session.isCompleted) assertFalse(session.isCancelled) //println("before context: $context") // Expected value to be null, but was: . // 但是加上 println 就好了 + // map 改成使用同步锁实现后就行了 //println(context) - assertNull(context[key]) - - // val newSession = context.session(key, strategy = ContinuousSessionContext.ConflictStrategy.OLD) {} - // println("oldSession: $session") - // println("newSession: $newSession") - // assertNotEquals(session, newSession) - } - withContext(Dispatchers.Default) { + val gotSession = context[key] + assertNull(context[key], "Expect context[$key] to be null, but was: $gotSession") } assertTrue(parentJob.isActive) } + @Test + fun sessionGetTest() = runTest { + val parentJob = Job() + val context = ContinuousSessionContext(Dispatchers.Default + parentJob) + val key = Any() + + + val inSession = InSession { + suspend fun awaitValue(): Int = await { it.toString() } + assertEquals(1, awaitValue()) + assertEquals(2, awaitValue()) + assertEquals(3, awaitValue()) + } + + fun s() = context.session(key, EXISTING, inSession) + + assertEquals("1", s().push(1)) + assertEquals("2", s().push(2)) + assertEquals("3", s().push(3)) + } }