From 9f67db4ca7e8d262362d2edc7531535b1fcf8d11 Mon Sep 17 00:00:00 2001 From: calvin681 Date: Sun, 13 Mar 2022 14:59:04 -0700 Subject: [PATCH] SinkClient fix logging on unsubscribe (#144) * Add logging to SinkClient to track uncleaned connections * Only count resubmission from worker failures * SinkClient fix logging on unsubscribe Co-authored-by: Calvin Cheung --- .../src/main/java/io/mantisrx/client/SinkClientImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java b/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java index a152ba8a9..0a5aed2d1 100644 --- a/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java +++ b/mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java @@ -144,10 +144,10 @@ public Observable call(EndpointChange endpointChange) { }) .doOnUnsubscribe(() -> { try { - logger.warn("Closing connections to sink of job " + jobId); + logger.warn("Closing connections to sink of job {}", jobId); closeAllConnections(); } catch (Exception e) { - Observable.error(e); + logger.warn("Error closing all connections to sink of job {}", jobId, e); } }) .share() @@ -212,7 +212,7 @@ public void call(Boolean flag) { } } return ((SinkConnection) sinkConnection).call() - //.flatMap(o -> o) + .takeWhile(e -> !nowClosed.get()) ; }