diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index aad296652..08dc31733 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -177,11 +178,20 @@ public T withConnection(final StatefulRedisClusterConnection con } } - public Publisher withConnectionReactive(final StatefulRedisClusterConnection connection, + public Publisher withConnectionReactive( + final StatefulRedisClusterConnection connection, final Function, Publisher> function) { - return Flux.from(function.apply(connection)) - .transformDeferred(RetryOperator.of(retry)); + final Publisher publisher = function.apply(connection); + + if (publisher instanceof Mono m) { + return m.transformDeferred(RetryOperator.of(retry)); + } + if (publisher instanceof Flux f) { + return f.transformDeferred(RetryOperator.of(retry)); + } + + return Flux.from(publisher).transformDeferred(RetryOperator.of(retry)); } public FaultTolerantPubSubClusterConnection createPubSubConnection() {