From 797d19ebad06683da0189b152338714b87aa3918 Mon Sep 17 00:00:00 2001 From: mjpt777 Date: Fri, 27 Oct 2023 09:46:47 +0100 Subject: [PATCH] [Java] Log event for unknown session close in service container rather than throw and exception. (cherry picked from commit 752cc9cc0a9eddf6e4e34c385fdeeb954b2a645d) --- .../service/ClusteredServiceAgent.java | 26 +++++++------- .../service/ClusteredServiceAgentTest.java | 36 ++++++++++++++++--- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java index 851e059961..dcf75fcc4d 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java @@ -478,26 +478,28 @@ void onSessionClose( { this.logPosition = logPosition; clusterTime = timestamp; - final ContainerClientSession session = sessionByIdMap.remove(clusterSessionId); + final ContainerClientSession session = sessionByIdMap.remove(clusterSessionId); if (null == session) { - throw new ClusterException( - "unknown clusterSessionId=" + clusterSessionId + " for close reason=" + closeReason + - " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition); + ctx.countedErrorHandler().onError(new ClusterEvent( + "unknown session close: clusterSessionId=" + clusterSessionId + " closeReason=" + closeReason + + " leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition)); } - - for (int i = 0, size = sessions.size(); i < size; i++) + else { - if (sessions.get(i).id() == clusterSessionId) + for (int i = 0, size = sessions.size(); i < size; i++) { - sessions.remove(i); - break; + if (sessions.get(i).id() == clusterSessionId) + { + sessions.remove(i); + break; + } } - } - session.disconnect(ctx.countedErrorHandler()); - service.onSessionClose(session, timestamp, closeReason); + session.disconnect(ctx.countedErrorHandler()); + service.onSessionClose(session, timestamp, closeReason); + } } void onServiceAction( diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusteredServiceAgentTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusteredServiceAgentTest.java index 45fec599db..e3bb81fd83 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusteredServiceAgentTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/service/ClusteredServiceAgentTest.java @@ -15,24 +15,25 @@ */ package io.aeron.cluster.service; -import io.aeron.Aeron; -import io.aeron.ConcurrentPublication; -import io.aeron.Publication; -import io.aeron.Subscription; -import io.aeron.UnavailableCounterHandler; +import io.aeron.*; import io.aeron.cluster.client.AeronCluster; +import io.aeron.cluster.codecs.CloseReason; import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.test.CountersAnswer; import io.aeron.test.Tests; import org.agrona.DirectBuffer; +import org.agrona.ErrorHandler; import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.*; import org.agrona.concurrent.errors.DistinctErrorLog; +import org.agrona.concurrent.errors.ErrorLogReader; +import org.agrona.concurrent.status.AtomicCounter; import org.agrona.concurrent.status.CountersManager; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import static io.aeron.Aeron.NULL_VALUE; @@ -40,6 +41,7 @@ import static io.aeron.AeronCounters.CLUSTER_RECOVERY_STATE_TYPE_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; class ClusteredServiceAgentTest @@ -124,4 +126,28 @@ void shouldAbortClusteredServiceIfCommitPositionCounterIsClosed() nanoClock.advance(TimeUnit.MILLISECONDS.toNanos(2)); assertThrowsExactly(ClusterTerminationException.class, clusteredServiceAgent::doWork); } + + @Test + void shouldLogErrorInsteadOfThrowingIfSessionIsNotFoundOnClose() + { + final Aeron aeron = mock(Aeron.class); + final DistinctErrorLog distinctErrorLog = new DistinctErrorLog( + new UnsafeBuffer(ByteBuffer.allocateDirect(16384)), new SystemEpochClock()); + final CountersManager countersManager = Tests.newCountersMananger(16 * 1024); + final AtomicCounter errorCounter = countersManager.newCounter("test"); + final long originalErrorCount = errorCounter.get(); + + final ErrorHandler errorHandler = CommonContext.setupErrorHandler(null, distinctErrorLog); + final CountedErrorHandler countedErrorHandler = new CountedErrorHandler(errorHandler, errorCounter); + final ClusteredServiceContainer.Context ctx = new ClusteredServiceContainer.Context() + .aeron(aeron) + .idleStrategySupplier(() -> YieldingIdleStrategy.INSTANCE) + .countedErrorHandler(countedErrorHandler); + final ClusteredServiceAgent clusteredServiceAgent = new ClusteredServiceAgent(ctx); + + clusteredServiceAgent.onSessionClose(99, 999, 9999, 99999, CloseReason.CLIENT_ACTION); + + assertEquals(originalErrorCount + 1, errorCounter.get()); + assertTrue(ErrorLogReader.hasErrors(distinctErrorLog.buffer())); + } }