diff --git a/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java b/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java index b3a26fb0805..7fceb796663 100644 --- a/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java +++ b/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadImpl.java @@ -93,13 +93,12 @@ public T invoke(Supplier supplier) { // cannot release it in between attempts, as that would give window for another thread to change the state inProgressLock.lock(); - if (metricsEnabled) { - callsCounterMetric.increment(); - } - // execute immediately if semaphore can be acquired boolean acquired; try { + if (metricsEnabled) { + callsCounterMetric.increment(); + } acquired = inProgress.tryAcquire(); } catch (Throwable t) { inProgressLock.unlock(); @@ -121,27 +120,31 @@ public T invoke(Supplier supplier) { throw t; } if (full) { - callsRejected.incrementAndGet(); inProgressLock.unlock(); // this request will fail, release lock + callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead queue \"" + name + "\" is full"); } try { // block current thread until barrier is retracted Barrier barrier; - long start = metricsEnabled ? System.nanoTime() : 0L; + long start = 0L; try { listeners.forEach(l -> l.enqueueing(supplier)); if (metricsEnabled) { + start = System.nanoTime(); callsWaiting.incrementAndGet(); } barrier = queue.enqueue(supplier); } finally { - if (metricsEnabled) { - waitingDurationMetric.record(System.nanoTime() - start, TimeUnit.NANOSECONDS); - callsWaiting.decrementAndGet(); + try { + if (metricsEnabled) { + waitingDurationMetric.record(System.nanoTime() - start, TimeUnit.NANOSECONDS); + callsWaiting.decrementAndGet(); + } + } finally { + inProgressLock.unlock(); // we have enqueued, now we can wait } - inProgressLock.unlock(); // we have enqueued, now we can wait } if (barrier == null) {