diff --git a/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/client/blockstream/BlockStreamSubscription.kt b/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/client/blockstream/BlockStreamSubscription.kt index a81f1bc12..2151373bd 100644 --- a/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/client/blockstream/BlockStreamSubscription.kt +++ b/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/client/blockstream/BlockStreamSubscription.kt @@ -73,6 +73,11 @@ class BlockStreamSubscription private constructor(private val context: BlockStre return storage.channel.cast>().receiveAsFlow().catch { storage.onFailure(it) } } + fun receiveBlockingJava(actionId: UUID): Flow { + val storage = source[actionId] ?: throw IrohaSdkException("Flow#$actionId not found") + return storage.channel.cast>().receiveAsFlow().catch { storage.onFailure(it) } + } + private suspend fun run() { var counter = 0 val request = VersionedBlockSubscriptionRequest.V1(BlockSubscriptionRequest(BigInteger.valueOf(context.from))) diff --git a/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/query/QueryBuilder.kt b/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/query/QueryBuilder.kt index c330c7eed..1ae572262 100644 --- a/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/query/QueryBuilder.kt +++ b/modules/client/src/main/kotlin/jp/co/soramitsu/iroha2/query/QueryBuilder.kt @@ -91,6 +91,7 @@ class QueryBuilder( companion object { @JvmStatic + @JvmOverloads fun findAllAccounts(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllAccounts(), AccountsExtractor, @@ -110,6 +111,7 @@ class QueryBuilder( ) = findAccountKeyValueByIdAndKey(accountId, key.asName()) @JvmStatic + @JvmOverloads fun findAccountsByName( name: Name, queryFilter: GenericPredicateBox? = null, @@ -120,18 +122,21 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAccountsByDomainId( domainId: DomainId, queryFilter: GenericPredicateBox? = null, ) = QueryBuilder(Queries.findAccountsByDomainId(domainId), AccountsExtractor, queryFilter) @JvmStatic + @JvmOverloads fun findAccountsWithAsset( definitionId: AssetDefinitionId, queryFilter: GenericPredicateBox? = null, ) = QueryBuilder(Queries.findAccountsWithAsset(definitionId), AccountsExtractor, queryFilter) @JvmStatic + @JvmOverloads fun findAllAssets(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllAssets(), AssetsExtractor, @@ -139,6 +144,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllAssetsDefinitions( queryFilter: GenericPredicateBox? = null, ) = QueryBuilder( @@ -148,6 +154,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAssetsByName( name: Name, queryFilter: GenericPredicateBox? = null, @@ -158,6 +165,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAssetsByAccountId( accountId: AccountId, queryFilter: GenericPredicateBox? = null, @@ -174,6 +182,7 @@ class QueryBuilder( fun findAssetById(assetId: AssetId) = QueryBuilder(Queries.findAssetById(assetId), AssetExtractor) @JvmStatic + @JvmOverloads fun findAssetsByDomainId( domainId: DomainId, queryFilter: GenericPredicateBox? = null, @@ -190,6 +199,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllAssetsDefinitions( assetDefinition: AssetDefinitionId, queryFilter: GenericPredicateBox? = null, @@ -200,6 +210,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAssetsByDomainIdAndAssetDefinitionId( domainId: DomainId, assetDefinition: AssetDefinitionId, @@ -256,6 +267,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllDomains( queryFilter: GenericPredicateBox? = null, ) = QueryBuilder(Queries.findAllDomains(), DomainsExtractor, queryFilter) @@ -264,10 +276,12 @@ class QueryBuilder( fun findDomainById(domainId: DomainId) = QueryBuilder(Queries.findDomainById(domainId), DomainExtractor) @JvmStatic + @JvmOverloads fun findAllPeers(queryFilter: GenericPredicateBox? = null) = QueryBuilder(Queries.findAllPeers(), PeersExtractor, queryFilter) @JvmStatic + @JvmOverloads fun findTransactionsByAccountId( accountId: AccountId, queryFilter: GenericPredicateBox? = null, @@ -278,6 +292,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findPermissionTokensByAccountId( accountId: AccountId, queryFilter: GenericPredicateBox? = null, @@ -288,6 +303,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllPermissionTokenDefinitions( queryFilter: GenericPredicateBox? = null, ) = QueryBuilder( @@ -297,6 +313,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findRolesByAccountId(accountId: AccountId, queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findRolesByAccountId(accountId), @@ -305,6 +322,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllRoleIds(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllRoleIds(), RoleIdsExtractor, @@ -312,6 +330,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllRoles(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllRoles(), RolesExtractor, @@ -319,6 +338,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findRoleByRoleId(roleId: RoleId, queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findRoleByRoleId(roleId), @@ -339,6 +359,7 @@ class QueryBuilder( fun findTransactionByHash(hex: String) = findTransactionByHash(hex.fromHex().hash().toIrohaHash()) @JvmStatic + @JvmOverloads fun findAllTransactions(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllTransactions(), TransactionQueryResultExtractor, @@ -346,6 +367,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllBlocks(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllBlocks(), BlocksValueExtractor, @@ -353,6 +375,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllBlockHeaders(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllBlockHeaders(), BlockHeadersExtractor, @@ -384,6 +407,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllActiveTriggerIds(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllActiveTriggerIds(), TriggerIdsExtractor, @@ -391,6 +415,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findTriggersByDomainId( domainId: DomainId, queryFilter: GenericPredicateBox? = null, @@ -401,6 +426,7 @@ class QueryBuilder( ) @JvmStatic + @JvmOverloads fun findAllParameters(queryFilter: GenericPredicateBox? = null) = QueryBuilder( Queries.findAllParameters(), ValueExtractor, diff --git a/modules/client/src/test/java/jp/co/soramitsu/iroha2/JavaTest.java b/modules/client/src/test/java/jp/co/soramitsu/iroha2/JavaTest.java index aa6027531..79c46f119 100644 --- a/modules/client/src/test/java/jp/co/soramitsu/iroha2/JavaTest.java +++ b/modules/client/src/test/java/jp/co/soramitsu/iroha2/JavaTest.java @@ -1,30 +1,28 @@ package jp.co.soramitsu.iroha2; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import jp.co.soramitsu.iroha2.client.Iroha2AsyncClient; -import jp.co.soramitsu.iroha2.generated.Account; -import jp.co.soramitsu.iroha2.generated.AccountId; -import jp.co.soramitsu.iroha2.generated.AssetId; -import jp.co.soramitsu.iroha2.generated.AssetValue; -import jp.co.soramitsu.iroha2.generated.AssetValueType; -import jp.co.soramitsu.iroha2.generated.Domain; -import jp.co.soramitsu.iroha2.generated.DomainId; +import jp.co.soramitsu.iroha2.client.blockstream.*; +import jp.co.soramitsu.iroha2.generated.*; import jp.co.soramitsu.iroha2.generated.Metadata; -import jp.co.soramitsu.iroha2.generated.Name; -import jp.co.soramitsu.iroha2.generated.Value; -import jp.co.soramitsu.iroha2.generated.VersionedSignedTransaction; import jp.co.soramitsu.iroha2.query.QueryAndExtractor; import jp.co.soramitsu.iroha2.query.QueryBuilder; import jp.co.soramitsu.iroha2.testengine.DefaultGenesis; import jp.co.soramitsu.iroha2.testengine.IrohaTest; import jp.co.soramitsu.iroha2.testengine.WithIroha; import jp.co.soramitsu.iroha2.transaction.TransactionBuilder; +import kotlin.*; +import kotlin.Pair; +import kotlin.coroutines.*; +import kotlinx.coroutines.*; +import static kotlinx.coroutines.BuildersKt.runBlocking; +import kotlinx.coroutines.flow.*; +import static org.apache.commons.lang3.RandomStringUtils.random; +import org.jetbrains.annotations.*; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,7 +47,7 @@ public void instructionFailed() { @Test @WithIroha(sources = DefaultGenesis.class) - public void registerDomainInstructionCommitted() throws ExecutionException, InterruptedException, TimeoutException { + public void registerDomain() throws ExecutionException, InterruptedException, TimeoutException { final DomainId domainId = new DomainId(new Name("new_domain_name")); final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder() .account(ALICE_ACCOUNT_ID) @@ -67,7 +65,7 @@ public void registerDomainInstructionCommitted() throws ExecutionException, Inte @Test @WithIroha(sources = DefaultGenesis.class) - public void registerAccountInstructionCommitted() throws Exception { + public void registerAccount() throws Exception { final AccountId accountId = new AccountId(new Name("new_account"), DEFAULT_DOMAIN_ID); final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder() .account(ALICE_ACCOUNT_ID) @@ -85,7 +83,7 @@ public void registerAccountInstructionCommitted() throws Exception { @Test @WithIroha(sources = DefaultGenesis.class) - public void mintAssetInstructionCommitted() throws Exception { + public void mintAsset() throws Exception { final VersionedSignedTransaction registerAssetTx = TransactionBuilder.Companion.builder() .account(ALICE_ACCOUNT_ID) .registerAssetDefinition(DEFAULT_ASSET_DEFINITION_ID, new AssetValueType.Quantity()) @@ -109,7 +107,7 @@ public void mintAssetInstructionCommitted() throws Exception { @Test @WithIroha(sources = DefaultGenesis.class) - public void updateKeyValueInstructionCommitted() throws Exception { + public void updateKeyValue() throws Exception { final Name assetMetadataKey = new Name("asset_metadata_key"); final Value.String assetMetadataValue = new Value.String("some string value"); final Value.String assetMetadataValue2 = new Value.String("some string value 2"); @@ -142,7 +140,7 @@ public void updateKeyValueInstructionCommitted() throws Exception { @Test @WithIroha(sources = DefaultGenesis.class) - public void setKeyValueInstructionCommitted() throws Exception { + public void setKeyValue() throws Exception { final Value.String assetValue = new Value.String("some string value"); final Name assetKey = new Name("asset_metadata_key"); @@ -167,4 +165,50 @@ public void setKeyValueInstructionCommitted() throws Exception { final Value value = future.get(10, TimeUnit.SECONDS); Assertions.assertEquals(((Value.String) value).getString(), assetValue.getString()); } + + @Test + @WithIroha(sources = DefaultGenesis.class) + public void blockStreaming() throws ExecutionException, InterruptedException { + int count = 5; + Pair idToSubscription = client.subscribeToBlockStream(1, count); + UUID actionId = idToSubscription.component1(); + BlockStreamSubscription subscription = idToSubscription.component2(); + + List blocks = new ArrayList<>(); + subscription.receive(actionId, new BlockMessageCollector(blocks)); + + for (int i = 0; i <= count + 1; i++) { + final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder() + .account(ALICE_ACCOUNT_ID) + .setKeyValue(ALICE_ACCOUNT_ID, new Name(random(10)), new Value.String(random(10))) + .buildSigned(ALICE_KEYPAIR); + client.sendTransactionAsync(transaction); + } + + QueryAndExtractor> query = QueryBuilder.findAllBlocks() + .account(ALICE_ACCOUNT_ID) + .buildSigned(ALICE_KEYPAIR); + Integer blocksSize = client.sendQueryAsync(query).get().size(); + + Assertions.assertEquals(blocksSize, blocks.size()); + } + + static class BlockMessageCollector implements FlowCollector { + + List blocks; + + public BlockMessageCollector(List blocks) { + this.blocks = blocks; + } + + @Nullable + @Override + public Object emit( + VersionedBlockMessage versionedBlockMessage, + @NotNull Continuation continuation + ) { + blocks.add(versionedBlockMessage); + return null; + } + } } diff --git a/modules/client/src/test/kotlin/jp/co/soramitsu/iroha2/BlockStreamTest.kt b/modules/client/src/test/kotlin/jp/co/soramitsu/iroha2/BlockStreamTest.kt index 653da2870..890302053 100644 --- a/modules/client/src/test/kotlin/jp/co/soramitsu/iroha2/BlockStreamTest.kt +++ b/modules/client/src/test/kotlin/jp/co/soramitsu/iroha2/BlockStreamTest.kt @@ -1,6 +1,7 @@ package jp.co.soramitsu.iroha2 import io.qameta.allure.Feature +import io.qameta.allure.Issue import io.qameta.allure.Owner import io.qameta.allure.Story import jp.co.soramitsu.iroha2.annotations.Sdk @@ -39,6 +40,7 @@ class BlockStreamTest : IrohaTest() { @WithIroha([NewAccountWithMetadata::class]) @Story("Successful subscription to block stream") @SdkTestId("subscription_to_block_stream") + @Issue("https://app.zenhub.com/workspaces/iroha-v2-60ddb820813b9100181fc060/issues/gh/hyperledger/iroha-java/361") fun `subscription to block stream`(): Unit = runBlocking { val idToSubscription = client.subscribeToBlockStream(from = 1, count = 2) val actionId = idToSubscription.first @@ -69,15 +71,14 @@ class BlockStreamTest : IrohaTest() { assertEquals(newAssetName, newAssetDefinition.id.name.string) assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString()) - // get the last block second time - blocks = mutableListOf() - subscription.receiveBlocking(actionId).collect { block -> blocks.add(block) } - isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1) - - newAssetDefinition = isi[0].cast().extractAssetDefinition() - assertNotNull(newAssetDefinition) - assertEquals(newAssetName, newAssetDefinition.id.name.string) - assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString()) +// blocks = mutableListOf() +// subscription.receiveBlocking(actionId).collect { block -> blocks.add(block) } +// isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1) +// +// newAssetDefinition = isi[0].cast().extractAssetDefinition() +// assertNotNull(newAssetDefinition) +// assertEquals(newAssetName, newAssetDefinition.id.name.string) +// assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString()) } @Test