Skip to content

Commit

Permalink
feat: 实现持续会话的基本内容
Browse files Browse the repository at this point in the history
  • Loading branch information
ForteScarlet committed Feb 7, 2024
1 parent 7a79552 commit c36b9c4
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public actual inline fun <K, V> MutableMap<K, V>.removeValue(key: K, crossinline
*
*/
public actual fun <K, V> concurrentMutableMap(): MutableMap<K, V> =
SynchronizedMutableMap(mutableMapOf())
SynchronizedMutableMap(mutableMapOf<K, V>())
// AtomicCopyOnWriteConcurrentMutableMap(emptyMap())

@PublishedApi
Expand Down Expand Up @@ -208,7 +208,7 @@ private class SynchronizedMutableMap<K, V>(private val map: MutableMap<K, V>) :
}


@Deprecated("似乎有Bug")
@Deprecated("似乎有Bug?")
private class AtomicCopyOnWriteConcurrentMutableMap<K, V>(initMap: Map<K, V>) : MutableMap<K, V>,
MutableMapOperators<K, V> {
private open class MapBox<K, out V>(val map: Map<K, V>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
*
*/

@file:JvmName("ContinuousSessionContexts")
@file:JvmMultifileClass

package love.forte.simbot.extension.continuous.session

import kotlinx.coroutines.*
Expand All @@ -36,6 +39,8 @@ import love.forte.simbot.extension.continuous.session.ContinuousSessionContext.C
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName


/**
Expand All @@ -46,6 +51,8 @@ public abstract class AbstractContinuousSessionContext<T, R>(coroutineContext: C
ContinuousSessionContext<T, R> {
protected val sessions: MutableMap<Any, ContinuousSessionProvider<T, R>> = concurrentMutableMap()
protected val launchScope: CoroutineScope = CoroutineScope(coroutineContext)
protected val subScope: CoroutineScope =
if (coroutineContext[Job] == null) launchScope else CoroutineScope(coroutineContext.minusKey(Job))

protected abstract fun computeSession(key: Any, inSession: InSession<T, R>): ContinuousSessionProvider<T, R>

Expand All @@ -56,49 +63,55 @@ public abstract class AbstractContinuousSessionContext<T, R>(coroutineContext: C
): ContinuousSessionProvider<T, R> {
return when (strategy) {
FAILURE -> {
sessions.computeValue(key) { _, old ->
sessions.computeValue(key) { k, old ->
if (old != null) throw IllegalStateException("Session with key $key already exists")

computeSession(key, inSession)
computeSession(k, inSession)
}!!
}

REPLACE -> {
sessions.computeValue(key) { _, old ->
old?.cancel(/* TODO REPLACED EXCEPTION? */)
computeSession(key, inSession)
sessions.computeValue(key) { k, old ->
old?.cancel(ReplacedBecauseOfConflictSessionKeyException("conflict key $k"))
computeSession(k, inSession)
}!!
}

OLD -> {
sessions.computeValueIfAbsent(key) { computeSession(key, inSession) }
EXISTING -> {
sessions.computeValueIfAbsent(key) { k -> computeSession(k, inSession) }
}
}
}

// 是否检测 isActive?

override fun get(key: Any): ContinuousSessionProvider<T, R>? = sessions[key]
override fun contains(key: Any): Boolean = sessions.containsKey(key)
override fun remove(key: Any): ContinuousSessionProvider<T, R>? = sessions[key]
}

/**
* 创建一个 [ContinuousSessionContext] 的基础实现类型。
*/
@JvmName("createContinuousSessionContext")
public fun <T, R> ContinuousSessionContext(coroutineContext: CoroutineContext): ContinuousSessionContext<T, R> =
SimpleContinuousSessionContext(coroutineContext)

internal class SimpleContinuousSessionContext<T, R>(coroutineContext: CoroutineContext) :
private class SimpleContinuousSessionContext<T, R>(coroutineContext: CoroutineContext) :
AbstractContinuousSessionContext<T, R>(coroutineContext) {
private val parentJob = coroutineContext[Job]

// override fun toString(): String {
// // TODO
// return sessions.toString()
// }

override fun computeSession(key: Any, inSession: InSession<T, R>): SimpleSessionImpl<T, R> {
val job = Job(parentJob)
val channel = Channel<SessionData<T, R>>()
val session = SimpleSessionImpl(key, job, channel, launchScope)
val session = SimpleSessionImpl(key, job, channel, subScope)

job.invokeOnCompletion {
channel.close(it)
sessions.removeValue(key) { session }
}
job.invokeOnCompletion {
channel.close(it)
}

launchScope.launch {
kotlin.runCatching {
Expand All @@ -114,15 +127,16 @@ internal class SimpleContinuousSessionContext<T, R>(coroutineContext: CoroutineC
}
}

internal data class SessionData<T, R>(val value: T, val continuation: CancellableContinuation<R>)

private data class SessionData<T, R>(val value: T, val continuation: CancellableContinuation<R>)

internal class SimpleSessionImpl<T, R>(
private class SimpleSessionImpl<T, R>(
private val key: Any,
private val job: CompletableJob,
private val channel: Channel<SessionData<T, R>>,
private val launchScope: CoroutineScope
) : ContinuousSession<T, R> {
override val coroutineContext: CoroutineContext
get() = launchScope.coroutineContext

override val isActive: Boolean
get() = job.isActive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,58 +23,128 @@

package love.forte.simbot.extension.continuous.session

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import love.forte.simbot.ability.CompletionAware
import love.forte.simbot.ability.OnCompletion
import kotlin.coroutines.CoroutineContext


/**
* 一组 `Session` 的元素之一,
* 用来向 [ContinuousSessionReceiver] 推送事件 [T] 并获悉结果 [R]。
*
* ```kotlin
* val session = context.session(Key()) {
* val next = await { v -> v.toResult() } // 假设 toResult() 将事件转化为结果
* } ↑ |
* |-- | ---------------------|
* | |-----------|
* ↓ |
* val result = session.push(value) // 此处得到 v.toResult() 的结果
* ```
*
* @author ForteScarlet
*/
public interface ContinuousSessionProvider<in T, out R> : CompletionAware {
/**
* 推送一个事件到对应的 [ContinuousSessionReceiver] 中并挂起直到将其
* [消费][ContinuousSessionReceiver.await] 或被关闭。
*
* @throws SessionPushOnFailureException 如果推送行为本身失败,
* 例如session已经结束或关闭。
* @throws SessionAwaitOnFailureException 如果推送行为成功、
* 但是在 [ContinuousSessionReceiver.await] 时出现了异常(例如构造响应结果时出现异常)
* @throws
*/
// TODO @ST?
public suspend fun push(value: T): R

/**
* 关闭对应的 `session`。
* 关闭后会同时将对应的 `session` 从对应的 [ContinuousSessionContext] 中移除。
*/
public fun cancel(cause: Throwable?)

/**
* 关闭对应的 `session`。
* 关闭后会同时将对应的 `session` 从对应的 [ContinuousSessionContext] 中移除。
*/
public fun cancel() {
cancel(null)
}

/**
* 是否处于活跃状态。
*/
public val isActive: Boolean

/**
* 是否已经完成。
*/
public val isCompleted: Boolean

/**
* 是否由于 cancel 而完成。
*/
public val isCancelled: Boolean

/**
* 挂起直到 `session` 完成任务或被关闭。
*/
// TODO @ST?
public suspend fun join()

/**
* 注册一个当 `session` 完成任务后执行的回调 [handle]。
* 也可以通过此回调得知被终止时的异常。
*/
override fun onCompletion(handle: OnCompletion)
}

/**
* 一组 `Session` 的元素之一,
* 用来在异步中接收 [ContinuousSessionProvider] 推送的事件 [T]
* 并根据此事件为其返回结果 [R]。
*
* ```kotlin
* val session = context.session(Key()) {
* val next = await { v -> v.toResult() } // 假设 toResult() 将事件转化为结果
* } ↑ |
* |-- | ---------------------|
* | |-----------|
* ↓ |
* val result = session.push(value) // 此处得到 v.toResult() 的结果
* ```
*
* @author ForteScarlet
*/
public interface ContinuousSessionReceiver<out T, R> {
public interface ContinuousSessionReceiver<out T, R> : CoroutineScope {
/**
* [ContinuousSessionReceiver] 作为 [CoroutineScope] 的协程上下文。
* 其中不会包含 [Job]。
*/
override val coroutineContext: CoroutineContext

/**
* 等待 [ContinuousSessionProvider] 的下一次 [推送][ContinuousSessionProvider.push]。
*
* 在 [await] 的实现中,
* 如果在 [await] 过程中出现异常,会在抛出此异常前,
* 将此异常使用 [SessionAwaitOnFailureException]
* 包装并恢复(`resume`)给 [ContinuousSessionProvider.push]。
*
*
* @param result
*
*/
// TODO @ST?
public suspend fun await(result: (T) -> R): T

// TODO await 如何延迟响应?

}


/**
* 组合 [ContinuousSessionProvider] 和 [ContinuousSessionReceiver]
* 的 `session` 类型。
*/
public interface ContinuousSession<T, R> : ContinuousSessionProvider<T, R>, ContinuousSessionReceiver<T, R>
Loading

0 comments on commit c36b9c4

Please sign in to comment.