From a3d9af132fca68b7a9b5e1454e15b239694f93b3 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 1 Oct 2024 10:27:40 -0500 Subject: [PATCH] Use Mono#transformDeferred when applicable --- .../redis/FaultTolerantRedisClusterClient.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index aad296652..08dc31733 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -177,11 +178,20 @@ public T withConnection(final StatefulRedisClusterConnection con } } - public Publisher withConnectionReactive(final StatefulRedisClusterConnection connection, + public Publisher withConnectionReactive( + final StatefulRedisClusterConnection connection, final Function, Publisher> function) { - return Flux.from(function.apply(connection)) - .transformDeferred(RetryOperator.of(retry)); + final Publisher publisher = function.apply(connection); + + if (publisher instanceof Mono m) { + return m.transformDeferred(RetryOperator.of(retry)); + } + if (publisher instanceof Flux f) { + return f.transformDeferred(RetryOperator.of(retry)); + } + + return Flux.from(publisher).transformDeferred(RetryOperator.of(retry)); } public FaultTolerantPubSubClusterConnection createPubSubConnection() {