Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resubscribe to the blocks streaming #401

Open
bbr88 opened this issue Jan 16, 2024 · 0 comments
Open

Resubscribe to the blocks streaming #401

bbr88 opened this issue Jan 16, 2024 · 0 comments
Assignees
Labels

Comments

@bbr88
Copy link

bbr88 commented Jan 16, 2024

Rationale

The current implementation does not consider reopening a subscription to the blocks streaming. In the case of a temporal network issue or if the node currently streaming blocks goes down, it's impossible to subscribe again. Attempts to resubscribe fail with the following exception:

jp.co.soramitsu.iroha2.IrohaSdkException: Flow#73dd19b2-d6c6-4c1a-b355-2fa191e0d6ac not found
	at jp.co.soramitsu.iroha2.client.blockstream.BlockStreamSubscription.receive(BlockStreamSubscription.kt:87)
	at jp.co.soramitsu.iroha2_adapter.service.IrohaService.initBlockStreamSubscription(IrohaService.java:205) ~[main/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor39.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.0.11.jar:6.0.11]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.0.11.jar:6.0.11]

Code snippet to subscribe to blocks streaming

val idToSubscription = iroha2AsyncClient.subscribeToBlockStream(latestBlockHeight.get(), Integer.MAX_VALUE);
val id = idToSubscription.getFirst().iterator().next().getId();
val subscription = idToSubscription.getSecond();
val publisher = ReactiveFlowKt.asPublisher(subscription.receive(id));

Flux.from(publisher)
        .subscribeOn(Schedulers.boundedElastic())
        .map(block -> (VersionedBlockMessage) block)
        .doOnNext(block -> {...})
        .doOnComplete(() -> {
            //never executed
        })
        .doOnCancel(() -> {
            //never executed
        })
        .doOnError(__ -> {
            //never executed
        })

Optional enhancement

No complete or cancel signal propagates to either of the //never executed blocks. Likely it happens since the coroutines-to-reactor adapter is used in the Java code. It would be great if the BlockStreamSubscription has isRunning() or a similar method.

@arndey arndey self-assigned this Jan 16, 2024
@arndey arndey added the iroha2 label Jan 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants