Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: akostiucenko <[email protected]>
  • Loading branch information
arndey committed Jul 26, 2023
1 parent 7da7726 commit e68341e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ open class SingletonHolder<out T : Any, in A>(creator: (A) -> T) {
@Volatile
private var instance: T? = null

open fun destroy() {
instance = null
}

fun getInstance(arg: A): T {
val checkInstance = instance
if (checkInstance != null) {
Expand All @@ -19,7 +23,6 @@ open class SingletonHolder<out T : Any, in A>(creator: (A) -> T) {
} else {
val created = creator!!(arg)
instance = created
creator = null
created
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import jp.co.soramitsu.iroha2.generated.VersionedBlockSubscriptionRequest
import jp.co.soramitsu.iroha2.toFrame
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
Expand All @@ -35,7 +35,6 @@ class BlockStreamSubscription private constructor(private val context: BlockStre
private val source: MutableMap<UUID, BlockStreamStorage> = mutableMapOf()
private var initialStorageId: UUID? = null
private var stopped: Boolean = false
private val jobs: MutableList<Job> = mutableListOf()

fun subscribe(): Pair<UUID, BlockStreamSubscription> {
if (initialStorageId == null) {
Expand All @@ -48,7 +47,9 @@ class BlockStreamSubscription private constructor(private val context: BlockStre

fun unsubscribe() {
stopped = true
jobs.forEach { it.cancel() }
this.cancel()
destroy()

logger.info("Unsubscribed from block stream")
}

Expand All @@ -66,14 +67,9 @@ class BlockStreamSubscription private constructor(private val context: BlockStre

fun <T> receive(actionId: UUID, collector: FlowCollector<T>) = launch {
receiveBlocking<T>(actionId).collect(collector)
}.also { jobs.add(it) }

fun <T> receiveBlocking(actionId: UUID): Flow<T> {
val storage = source[actionId] ?: throw IrohaSdkException("Flow#$actionId not found")
return storage.channel.cast<Channel<T>>().receiveAsFlow().catch { storage.onFailure(it) }
}

fun <T> receiveBlockingJava(actionId: UUID): Flow<T> {
fun <T> receiveBlocking(actionId: UUID): Flow<T> {
val storage = source[actionId] ?: throw IrohaSdkException("Flow#$actionId not found")
return storage.channel.cast<Channel<T>>().receiveAsFlow().catch { storage.onFailure(it) }
}
Expand All @@ -96,7 +92,6 @@ class BlockStreamSubscription private constructor(private val context: BlockStre
source.closeAndClear()
return@webSocket
}

logger.debug("Received frame: {}", frame)

val block = VersionedBlockMessage.decode(frame.readBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import jp.co.soramitsu.iroha2.testengine.NewAccountWithMetadata
import jp.co.soramitsu.iroha2.testengine.WithIroha
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.parallel.ResourceLock
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils.random
import java.math.BigInteger
import kotlin.test.assertEquals
Expand All @@ -41,6 +42,7 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
@Story("Successful subscription to block stream")
@SdkTestId("subscription_to_block_stream")
@Issue("https://app.zenhub.com/workspaces/iroha-v2-60ddb820813b9100181fc060/issues/gh/hyperledger/iroha-java/361")
@ResourceLock("blockStream")
fun `subscription to block stream`(): Unit = runBlocking {
val idToSubscription = client.subscribeToBlockStream(from = 1, count = 2)
val actionId = idToSubscription.first
Expand Down Expand Up @@ -71,6 +73,8 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
assertEquals(newAssetName, newAssetDefinition.id.name.string)
assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())

subscription.unsubscribe()

// blocks = mutableListOf()
// subscription.receiveBlocking<VersionedBlockMessage>(actionId).collect { block -> blocks.add(block) }
// isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1)
Expand All @@ -85,6 +89,7 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
@WithIroha([DefaultGenesis::class])
@Story("Successful subscription to endless block stream")
@SdkTestId("subscription_to_endless_block_stream")
@ResourceLock("blockStream")
fun `subscription to endless block stream`(): Unit = runBlocking {
val repeatTimes = 5
val shift = 1 // to test not to take more than was ordered
Expand Down

0 comments on commit e68341e

Please sign in to comment.