Skip to content

Commit

Permalink
Use Mono#transformDeferred when applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal authored and jon-signal committed Oct 10, 2024
1 parent 5d8b566 commit a3d9af1
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
Expand Down Expand Up @@ -177,11 +178,20 @@ public <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> con
}
}

public <T, K, V> Publisher<T> withConnectionReactive(final StatefulRedisClusterConnection<K, V> connection,
public <T, K, V> Publisher<T> withConnectionReactive(
final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function) {

return Flux.from(function.apply(connection))
.transformDeferred(RetryOperator.of(retry));
final Publisher<T> publisher = function.apply(connection);

if (publisher instanceof Mono<T> m) {
return m.transformDeferred(RetryOperator.of(retry));
}
if (publisher instanceof Flux<T> f) {
return f.transformDeferred(RetryOperator.of(retry));
}

return Flux.from(publisher).transformDeferred(RetryOperator.of(retry));
}

public FaultTolerantPubSubClusterConnection<String, String> createPubSubConnection() {
Expand Down

0 comments on commit a3d9af1

Please sign in to comment.