From 027469ace479395b6609c02590418198004690a4 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 4 Apr 2024 20:46:13 +0200 Subject: [PATCH] [Java] Upgrade to Aeron 1.44.0. --- .../artio/engine/RecordingCoordinator.java | 14 ++++++------- .../engine/logger/RecordingIdLookup.java | 20 ++++++++++++++++++- .../artio/engine/logger/ReplayOperation.java | 3 ++- .../artio/engine/logger/ReplayIndexTest.java | 3 ++- .../logger/SequenceNumberIndexTest.java | 3 ++- .../engine/logger/SmallReplayIndexTest.java | 3 ++- build.gradle | 2 +- 7 files changed, 35 insertions(+), 13 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/RecordingCoordinator.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/RecordingCoordinator.java index 07ce6ef2bd..9cc314eb0c 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/RecordingCoordinator.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/RecordingCoordinator.java @@ -159,10 +159,10 @@ public static File recordingIdsFile(final EngineConfiguration configuration) if (configuration.logAnyMessages()) { counters = this.aeron.countersReader(); - framerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters); - framerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters); - indexerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters); - indexerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, counters); + framerInboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters); + framerOutboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters); + indexerInboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters); + indexerOutboundLookup = new RecordingIdLookup(archive.archiveId(), archiverIdleStrategy, counters); } else { @@ -522,7 +522,7 @@ private boolean startRecording( private boolean recordingAlreadyStarted(final int sessionId) { - return RecordingPos.findCounterIdBySession(counters, sessionId) != NULL_VALUE; + return RecordingPos.findCounterIdBySession(counters, sessionId, archive.archiveId()) != NULL_VALUE; } // awaits the recording start and saves the file @@ -603,7 +603,7 @@ private void awaitRecordingsCompletion( final List completingRecordings = new ArrayList<>(); aeronSessionIdToCompletionPosition.forEachLong((sessionId, completionPosition) -> { - final int counterId = RecordingPos.findCounterIdBySession(counters, (int)sessionId); + final int counterId = RecordingPos.findCounterIdBySession(counters, (int)sessionId, archive.archiveId()); // Recording has completed if (counterId != NULL_COUNTER_ID) { @@ -630,7 +630,7 @@ private void shutdownArchiver() it.next(); final long registrationId = it.getLongKey(); final long recordingId = it.getLongValue(); - final int counterId = RecordingPos.findCounterIdByRecording(counters, recordingId); + final int counterId = RecordingPos.findCounterIdByRecording(counters, recordingId, archive.archiveId()); archive.stopRecording(registrationId); if (counterId != NULL_COUNTER_ID) { diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/RecordingIdLookup.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/RecordingIdLookup.java index d346fc8e6b..d7fbd99643 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/RecordingIdLookup.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/RecordingIdLookup.java @@ -15,6 +15,7 @@ */ package uk.co.real_logic.artio.engine.logger; +import io.aeron.Aeron; import io.aeron.archive.status.RecordingPos; import org.agrona.collections.Long2LongHashMap; import org.agrona.concurrent.IdleStrategy; @@ -26,13 +27,30 @@ public class RecordingIdLookup { private final Long2LongHashMap aeronSessionIdToRecordingId = new Long2LongHashMap(NULL_RECORDING_ID); + private final long archiveId; private final IdleStrategy archiverIdleStrategy; private final CountersReader counters; + /** + * + * @param archiverIdleStrategy idle strategy. + * @param counters reader. + * @deprecated Use {@link #RecordingIdLookup(long, IdleStrategy, CountersReader)} instead. + */ + @Deprecated public RecordingIdLookup( final IdleStrategy archiverIdleStrategy, final CountersReader counters) { + this(Aeron.NULL_VALUE, archiverIdleStrategy, counters); + } + + public RecordingIdLookup( + final long archiveId, + final IdleStrategy archiverIdleStrategy, + final CountersReader counters) + { + this.archiveId = archiveId; this.archiverIdleStrategy = archiverIdleStrategy; this.counters = counters; } @@ -67,7 +85,7 @@ long findRecordingId(final int aeronSessionId) private long checkRecordingId(final int aeronSessionId) { - final int counterId = RecordingPos.findCounterIdBySession(counters, aeronSessionId); + final int counterId = RecordingPos.findCounterIdBySession(counters, aeronSessionId, archiveId); if (counterId == NULL_COUNTER_ID) { return NULL_RECORDING_ID; diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayOperation.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayOperation.java index e3ed02319c..eceabe5c3d 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayOperation.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayOperation.java @@ -425,7 +425,8 @@ int replayedMessages() private boolean archivingNotComplete(final long endPosition, final long recordingId) { - final int counterId = RecordingPos.findCounterIdByRecording(countersReader, recordingId); + final int counterId = + RecordingPos.findCounterIdByRecording(countersReader, recordingId, aeronArchive.archiveId()); // wait if the recording is active - otherwise assume that the recording has complete. if (counterId != CountersReader.NULL_COUNTER_ID) diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java index b7011ed6bc..24f3d7b815 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayIndexTest.java @@ -135,7 +135,8 @@ public void setUp() mediaDriver = TestFixtures.launchMediaDriver(); aeronArchive = AeronArchive.connect(aeronArchiveContext()); - recordingIdLookup = new RecordingIdLookup(new YieldingIdleStrategy(), aeron().countersReader()); + recordingIdLookup = + new RecordingIdLookup(aeronArchive.archiveId(), new YieldingIdleStrategy(), aeron().countersReader()); aeronArchive.startRecording(CHANNEL, STREAM_ID, SourceLocation.LOCAL); diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SequenceNumberIndexTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SequenceNumberIndexTest.java index d90577ed4b..7bf5ed1161 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SequenceNumberIndexTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SequenceNumberIndexTest.java @@ -108,7 +108,8 @@ public void setUp() deleteFiles(); - recordingIdLookup = new RecordingIdLookup(YieldingIdleStrategy.INSTANCE, aeron.countersReader()); + recordingIdLookup = + new RecordingIdLookup(aeronArchive.archiveId(), YieldingIdleStrategy.INSTANCE, aeron.countersReader()); writer = newWriter(inMemoryBuffer); reader = new SequenceNumberIndexReader(inMemoryBuffer, errorHandler, recordingIdLookup, null); } diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SmallReplayIndexTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SmallReplayIndexTest.java index 9af63c4f7f..f38f7ea99c 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SmallReplayIndexTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/SmallReplayIndexTest.java @@ -136,7 +136,8 @@ public void setUp() mediaDriver = TestFixtures.launchMediaDriver(); aeronArchive = AeronArchive.connect(aeronArchiveContext()); - recordingIdLookup = new RecordingIdLookup(new YieldingIdleStrategy(), aeron().countersReader()); + recordingIdLookup = + new RecordingIdLookup(aeronArchive.archiveId(), new YieldingIdleStrategy(), aeron().countersReader()); aeronArchive.startRecording(CHANNEL, STREAM_ID, SourceLocation.LOCAL); diff --git a/build.gradle b/build.gradle index 5e6e128e89..49fbfd9295 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ def byteBuddyVersion = '1.14.13' def agronaVersion = '1.21.1' def sbeVersion = '1.31.0' -def aeronVersion = '1.43.0' +def aeronVersion = '1.44.0' def artioGroup = 'uk.co.real-logic' def iLink3Enabled = false