Skip to content

Commit

Permalink
java block stream test
Browse files Browse the repository at this point in the history
Signed-off-by: akostiucenko <[email protected]>
  • Loading branch information
arndey committed Jul 26, 2023
1 parent f0fcd9a commit 7da7726
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class BlockStreamSubscription private constructor(private val context: BlockStre
return storage.channel.cast<Channel<T>>().receiveAsFlow().catch { storage.onFailure(it) }
}

fun <T> receiveBlockingJava(actionId: UUID): Flow<T> {
val storage = source[actionId] ?: throw IrohaSdkException("Flow#$actionId not found")
return storage.channel.cast<Channel<T>>().receiveAsFlow().catch { storage.onFailure(it) }
}

private suspend fun run() {
var counter = 0
val request = VersionedBlockSubscriptionRequest.V1(BlockSubscriptionRequest(BigInteger.valueOf(context.from)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class QueryBuilder<R>(

companion object {
@JvmStatic
@JvmOverloads
fun findAllAccounts(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllAccounts(),
AccountsExtractor,
Expand All @@ -110,6 +111,7 @@ class QueryBuilder<R>(
) = findAccountKeyValueByIdAndKey(accountId, key.asName())

@JvmStatic
@JvmOverloads
fun findAccountsByName(
name: Name,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -120,25 +122,29 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAccountsByDomainId(
domainId: DomainId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
) = QueryBuilder(Queries.findAccountsByDomainId(domainId), AccountsExtractor, queryFilter)

@JvmStatic
@JvmOverloads
fun findAccountsWithAsset(
definitionId: AssetDefinitionId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
) = QueryBuilder(Queries.findAccountsWithAsset(definitionId), AccountsExtractor, queryFilter)

@JvmStatic
@JvmOverloads
fun findAllAssets(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllAssets(),
AssetsExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findAllAssetsDefinitions(
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
) = QueryBuilder(
Expand All @@ -148,6 +154,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAssetsByName(
name: Name,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -158,6 +165,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAssetsByAccountId(
accountId: AccountId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -174,6 +182,7 @@ class QueryBuilder<R>(
fun findAssetById(assetId: AssetId) = QueryBuilder(Queries.findAssetById(assetId), AssetExtractor)

@JvmStatic
@JvmOverloads
fun findAssetsByDomainId(
domainId: DomainId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -190,6 +199,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllAssetsDefinitions(
assetDefinition: AssetDefinitionId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -200,6 +210,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAssetsByDomainIdAndAssetDefinitionId(
domainId: DomainId,
assetDefinition: AssetDefinitionId,
Expand Down Expand Up @@ -256,6 +267,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllDomains(
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
) = QueryBuilder(Queries.findAllDomains(), DomainsExtractor, queryFilter)
Expand All @@ -264,10 +276,12 @@ class QueryBuilder<R>(
fun findDomainById(domainId: DomainId) = QueryBuilder(Queries.findDomainById(domainId), DomainExtractor)

@JvmStatic
@JvmOverloads
fun findAllPeers(queryFilter: GenericPredicateBox<ValuePredicate>? = null) =
QueryBuilder(Queries.findAllPeers(), PeersExtractor, queryFilter)

@JvmStatic
@JvmOverloads
fun findTransactionsByAccountId(
accountId: AccountId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -278,6 +292,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findPermissionTokensByAccountId(
accountId: AccountId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -288,6 +303,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllPermissionTokenDefinitions(
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
) = QueryBuilder(
Expand All @@ -297,6 +313,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findRolesByAccountId(accountId: AccountId, queryFilter: GenericPredicateBox<ValuePredicate>? = null) =
QueryBuilder(
Queries.findRolesByAccountId(accountId),
Expand All @@ -305,20 +322,23 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllRoleIds(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllRoleIds(),
RoleIdsExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findAllRoles(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllRoles(),
RolesExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findRoleByRoleId(roleId: RoleId, queryFilter: GenericPredicateBox<ValuePredicate>? = null) =
QueryBuilder(
Queries.findRoleByRoleId(roleId),
Expand All @@ -339,20 +359,23 @@ class QueryBuilder<R>(
fun findTransactionByHash(hex: String) = findTransactionByHash(hex.fromHex().hash().toIrohaHash())

@JvmStatic
@JvmOverloads
fun findAllTransactions(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllTransactions(),
TransactionQueryResultExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findAllBlocks(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllBlocks(),
BlocksValueExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findAllBlockHeaders(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllBlockHeaders(),
BlockHeadersExtractor,
Expand Down Expand Up @@ -384,13 +407,15 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllActiveTriggerIds(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllActiveTriggerIds(),
TriggerIdsExtractor,
queryFilter,
)

@JvmStatic
@JvmOverloads
fun findTriggersByDomainId(
domainId: DomainId,
queryFilter: GenericPredicateBox<ValuePredicate>? = null,
Expand All @@ -401,6 +426,7 @@ class QueryBuilder<R>(
)

@JvmStatic
@JvmOverloads
fun findAllParameters(queryFilter: GenericPredicateBox<ValuePredicate>? = null) = QueryBuilder(
Queries.findAllParameters(),
ValueExtractor,
Expand Down
80 changes: 62 additions & 18 deletions modules/client/src/test/java/jp/co/soramitsu/iroha2/JavaTest.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package jp.co.soramitsu.iroha2;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import jp.co.soramitsu.iroha2.client.Iroha2AsyncClient;
import jp.co.soramitsu.iroha2.generated.Account;
import jp.co.soramitsu.iroha2.generated.AccountId;
import jp.co.soramitsu.iroha2.generated.AssetId;
import jp.co.soramitsu.iroha2.generated.AssetValue;
import jp.co.soramitsu.iroha2.generated.AssetValueType;
import jp.co.soramitsu.iroha2.generated.Domain;
import jp.co.soramitsu.iroha2.generated.DomainId;
import jp.co.soramitsu.iroha2.client.blockstream.*;
import jp.co.soramitsu.iroha2.generated.*;
import jp.co.soramitsu.iroha2.generated.Metadata;
import jp.co.soramitsu.iroha2.generated.Name;
import jp.co.soramitsu.iroha2.generated.Value;
import jp.co.soramitsu.iroha2.generated.VersionedSignedTransaction;
import jp.co.soramitsu.iroha2.query.QueryAndExtractor;
import jp.co.soramitsu.iroha2.query.QueryBuilder;
import jp.co.soramitsu.iroha2.testengine.DefaultGenesis;
import jp.co.soramitsu.iroha2.testengine.IrohaTest;
import jp.co.soramitsu.iroha2.testengine.WithIroha;
import jp.co.soramitsu.iroha2.transaction.TransactionBuilder;
import kotlin.*;
import kotlin.Pair;
import kotlin.coroutines.*;
import kotlinx.coroutines.*;
import static kotlinx.coroutines.BuildersKt.runBlocking;
import kotlinx.coroutines.flow.*;
import static org.apache.commons.lang3.RandomStringUtils.random;
import org.jetbrains.annotations.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -49,7 +47,7 @@ public void instructionFailed() {

@Test
@WithIroha(sources = DefaultGenesis.class)
public void registerDomainInstructionCommitted() throws ExecutionException, InterruptedException, TimeoutException {
public void registerDomain() throws ExecutionException, InterruptedException, TimeoutException {
final DomainId domainId = new DomainId(new Name("new_domain_name"));
final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder()
.account(ALICE_ACCOUNT_ID)
Expand All @@ -67,7 +65,7 @@ public void registerDomainInstructionCommitted() throws ExecutionException, Inte

@Test
@WithIroha(sources = DefaultGenesis.class)
public void registerAccountInstructionCommitted() throws Exception {
public void registerAccount() throws Exception {
final AccountId accountId = new AccountId(new Name("new_account"), DEFAULT_DOMAIN_ID);
final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder()
.account(ALICE_ACCOUNT_ID)
Expand All @@ -85,7 +83,7 @@ public void registerAccountInstructionCommitted() throws Exception {

@Test
@WithIroha(sources = DefaultGenesis.class)
public void mintAssetInstructionCommitted() throws Exception {
public void mintAsset() throws Exception {
final VersionedSignedTransaction registerAssetTx = TransactionBuilder.Companion.builder()
.account(ALICE_ACCOUNT_ID)
.registerAssetDefinition(DEFAULT_ASSET_DEFINITION_ID, new AssetValueType.Quantity())
Expand All @@ -109,7 +107,7 @@ public void mintAssetInstructionCommitted() throws Exception {

@Test
@WithIroha(sources = DefaultGenesis.class)
public void updateKeyValueInstructionCommitted() throws Exception {
public void updateKeyValue() throws Exception {
final Name assetMetadataKey = new Name("asset_metadata_key");
final Value.String assetMetadataValue = new Value.String("some string value");
final Value.String assetMetadataValue2 = new Value.String("some string value 2");
Expand Down Expand Up @@ -142,7 +140,7 @@ public void updateKeyValueInstructionCommitted() throws Exception {

@Test
@WithIroha(sources = DefaultGenesis.class)
public void setKeyValueInstructionCommitted() throws Exception {
public void setKeyValue() throws Exception {
final Value.String assetValue = new Value.String("some string value");
final Name assetKey = new Name("asset_metadata_key");

Expand All @@ -167,4 +165,50 @@ public void setKeyValueInstructionCommitted() throws Exception {
final Value value = future.get(10, TimeUnit.SECONDS);
Assertions.assertEquals(((Value.String) value).getString(), assetValue.getString());
}

@Test
@WithIroha(sources = DefaultGenesis.class)
public void blockStreaming() throws ExecutionException, InterruptedException {
int count = 5;
Pair<UUID, BlockStreamSubscription> idToSubscription = client.subscribeToBlockStream(1, count);
UUID actionId = idToSubscription.component1();
BlockStreamSubscription subscription = idToSubscription.component2();

List<VersionedBlockMessage> blocks = new ArrayList<>();
subscription.receive(actionId, new BlockMessageCollector(blocks));

for (int i = 0; i <= count + 1; i++) {
final VersionedSignedTransaction transaction = TransactionBuilder.Companion.builder()
.account(ALICE_ACCOUNT_ID)
.setKeyValue(ALICE_ACCOUNT_ID, new Name(random(10)), new Value.String(random(10)))
.buildSigned(ALICE_KEYPAIR);
client.sendTransactionAsync(transaction);
}

QueryAndExtractor<List<VersionedCommittedBlock>> query = QueryBuilder.findAllBlocks()
.account(ALICE_ACCOUNT_ID)
.buildSigned(ALICE_KEYPAIR);
Integer blocksSize = client.sendQueryAsync(query).get().size();

Assertions.assertEquals(blocksSize, blocks.size());
}

static class BlockMessageCollector implements FlowCollector<VersionedBlockMessage> {

List<VersionedBlockMessage> blocks;

public BlockMessageCollector(List<VersionedBlockMessage> blocks) {
this.blocks = blocks;
}

@Nullable
@Override
public Object emit(
VersionedBlockMessage versionedBlockMessage,
@NotNull Continuation<? super Unit> continuation
) {
blocks.add(versionedBlockMessage);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jp.co.soramitsu.iroha2

import io.qameta.allure.Feature
import io.qameta.allure.Issue
import io.qameta.allure.Owner
import io.qameta.allure.Story
import jp.co.soramitsu.iroha2.annotations.Sdk
Expand Down Expand Up @@ -39,6 +40,7 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
@WithIroha([NewAccountWithMetadata::class])
@Story("Successful subscription to block stream")
@SdkTestId("subscription_to_block_stream")
@Issue("https://app.zenhub.com/workspaces/iroha-v2-60ddb820813b9100181fc060/issues/gh/hyperledger/iroha-java/361")
fun `subscription to block stream`(): Unit = runBlocking {
val idToSubscription = client.subscribeToBlockStream(from = 1, count = 2)
val actionId = idToSubscription.first
Expand Down Expand Up @@ -69,15 +71,14 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
assertEquals(newAssetName, newAssetDefinition.id.name.string)
assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())

// get the last block second time
blocks = mutableListOf()
subscription.receiveBlocking<VersionedBlockMessage>(actionId).collect { block -> blocks.add(block) }
isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1)

newAssetDefinition = isi[0].cast<InstructionBox.Register>().extractAssetDefinition()
assertNotNull(newAssetDefinition)
assertEquals(newAssetName, newAssetDefinition.id.name.string)
assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())
// blocks = mutableListOf()
// subscription.receiveBlocking<VersionedBlockMessage>(actionId).collect { block -> blocks.add(block) }
// isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1)
//
// newAssetDefinition = isi[0].cast<InstructionBox.Register>().extractAssetDefinition()
// assertNotNull(newAssetDefinition)
// assertEquals(newAssetName, newAssetDefinition.id.name.string)
// assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())
}

@Test
Expand Down

0 comments on commit 7da7726

Please sign in to comment.