Skip to content

Commit

Permalink
[Java] Log event for unknown session close in service container rathe…
Browse files Browse the repository at this point in the history
…r than throw and exception.

(cherry picked from commit 752cc9c)
  • Loading branch information
mjpt777 authored and vyazelenko committed Nov 9, 2023
1 parent 5f90f54 commit 797d19e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,26 +478,28 @@ void onSessionClose(
{
this.logPosition = logPosition;
clusterTime = timestamp;
final ContainerClientSession session = sessionByIdMap.remove(clusterSessionId);

final ContainerClientSession session = sessionByIdMap.remove(clusterSessionId);
if (null == session)
{
throw new ClusterException(
"unknown clusterSessionId=" + clusterSessionId + " for close reason=" + closeReason +
" leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition);
ctx.countedErrorHandler().onError(new ClusterEvent(
"unknown session close: clusterSessionId=" + clusterSessionId + " closeReason=" + closeReason +
" leadershipTermId=" + leadershipTermId + " logPosition=" + logPosition));
}

for (int i = 0, size = sessions.size(); i < size; i++)
else
{
if (sessions.get(i).id() == clusterSessionId)
for (int i = 0, size = sessions.size(); i < size; i++)
{
sessions.remove(i);
break;
if (sessions.get(i).id() == clusterSessionId)
{
sessions.remove(i);
break;
}
}
}

session.disconnect(ctx.countedErrorHandler());
service.onSessionClose(session, timestamp, closeReason);
session.disconnect(ctx.countedErrorHandler());
service.onSessionClose(session, timestamp, closeReason);
}
}

void onServiceAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@
*/
package io.aeron.cluster.service;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.UnavailableCounterHandler;
import io.aeron.*;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.test.CountersAnswer;
import io.aeron.test.Tests;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.*;
import org.agrona.concurrent.errors.DistinctErrorLog;
import org.agrona.concurrent.errors.ErrorLogReader;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.AeronCounters.CLUSTER_COMMIT_POSITION_TYPE_ID;
import static io.aeron.AeronCounters.CLUSTER_RECOVERY_STATE_TYPE_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

class ClusteredServiceAgentTest
Expand Down Expand Up @@ -124,4 +126,28 @@ void shouldAbortClusteredServiceIfCommitPositionCounterIsClosed()
nanoClock.advance(TimeUnit.MILLISECONDS.toNanos(2));
assertThrowsExactly(ClusterTerminationException.class, clusteredServiceAgent::doWork);
}

@Test
void shouldLogErrorInsteadOfThrowingIfSessionIsNotFoundOnClose()
{
final Aeron aeron = mock(Aeron.class);
final DistinctErrorLog distinctErrorLog = new DistinctErrorLog(
new UnsafeBuffer(ByteBuffer.allocateDirect(16384)), new SystemEpochClock());
final CountersManager countersManager = Tests.newCountersMananger(16 * 1024);
final AtomicCounter errorCounter = countersManager.newCounter("test");
final long originalErrorCount = errorCounter.get();

final ErrorHandler errorHandler = CommonContext.setupErrorHandler(null, distinctErrorLog);
final CountedErrorHandler countedErrorHandler = new CountedErrorHandler(errorHandler, errorCounter);
final ClusteredServiceContainer.Context ctx = new ClusteredServiceContainer.Context()
.aeron(aeron)
.idleStrategySupplier(() -> YieldingIdleStrategy.INSTANCE)
.countedErrorHandler(countedErrorHandler);
final ClusteredServiceAgent clusteredServiceAgent = new ClusteredServiceAgent(ctx);

clusteredServiceAgent.onSessionClose(99, 999, 9999, 99999, CloseReason.CLIENT_ACTION);

assertEquals(originalErrorCount + 1, errorCounter.get());
assertTrue(ErrorLogReader.hasErrors(distinctErrorLog.buffer()));
}
}

0 comments on commit 797d19e

Please sign in to comment.