From 482047263a0f9ef8aa2ab9fe3d72d7f9a6ca9bcf Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Wed, 15 Jan 2025 09:48:55 +0200 Subject: [PATCH] AtomicBuffer acquire/release operations. The AtomicBuffer has release method for every ordered method. So for e.g. a AtomicBUffer.putLongOrdered, there is a putLongRelease. The ordered method will call the release version, so there is a sligth performance penalty. Also acquire get methods have been added that have slightly weaker memory ordering semantics compared to a volatile read. Which could provide more oppertunity for the JIT to do its magic and in theory could give better performance on ISAs with weaker memory models. --- .../agent/BufferAlignmentAgentTest.java | 16 ++-- .../agrona/concurrent/UnsafeBufferTests.java | 8 +- agrona/src/main/java/org/agrona/MarkFile.java | 4 +- .../org/agrona/concurrent/AtomicBuffer.java | 79 +++++++++++++++++-- .../org/agrona/concurrent/UnsafeBuffer.java | 58 ++++++++++++++ .../broadcast/BroadcastTransmitter.java | 6 +- .../concurrent/errors/DistinctErrorLog.java | 4 +- .../ringbuffer/ManyToOneRingBuffer.java | 30 +++---- .../ringbuffer/OneToOneRingBuffer.java | 26 +++--- .../concurrent/status/CountersManager.java | 24 +++--- .../agrona/concurrent/AtomicBufferTest.java | 20 ++--- .../broadcast/BroadcastTransmitterTest.java | 24 +++--- .../errors/DistinctErrorLogTest.java | 40 +++++----- .../ringbuffer/ManyToOneRingBufferTest.java | 60 +++++++------- .../ringbuffer/OneToOneRingBufferTest.java | 54 ++++++------- 15 files changed, 290 insertions(+), 163 deletions(-) diff --git a/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java b/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java index eab323e5a..e365a0b90 100644 --- a/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java +++ b/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java @@ -284,16 +284,16 @@ private void testAlignedAtomicMethods(final AtomicBuffer buffer, final int offse buffer.compareAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE, Long.MAX_VALUE); buffer.getAndAddLong(offset + SIZE_OF_LONG, Long.MAX_VALUE); buffer.getAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE); - buffer.putLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE); - buffer.addLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE); + buffer.putLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE); + buffer.addLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE); buffer.getIntVolatile(offset + SIZE_OF_INT); buffer.putIntVolatile(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.compareAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE, Integer.MAX_VALUE); buffer.getAndAddInt(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.getAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE); - buffer.putIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE); - buffer.addIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE); + buffer.putIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE); + buffer.addIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.getShortVolatile(offset + SIZE_OF_SHORT); buffer.putShortVolatile(offset + SIZE_OF_SHORT, Short.MAX_VALUE); @@ -319,8 +319,8 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off addressOffset, offset + SIZE_OF_INT, (i) -> buffer.compareAndSetLong(i, Long.MAX_VALUE, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndAddLong(i, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndSetLong(i, Long.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongOrdered(i, Long.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongOrdered(i, Long.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongRelease(i, Long.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongRelease(i, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, buffer::getIntVolatile); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntVolatile(i, Integer.MAX_VALUE)); @@ -328,8 +328,8 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off (i) -> buffer.compareAndSetInt(i, Integer.MAX_VALUE, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndAddInt(i, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndSetInt(i, Integer.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntOrdered(i, Integer.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntOrdered(i, Integer.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntRelease(i, Integer.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntRelease(i, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, buffer::getShortVolatile); assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, (i) -> buffer.putShortVolatile(i, Short.MAX_VALUE)); diff --git a/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java b/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java index 65ad93dbf..47b9175b4 100644 --- a/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java +++ b/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java @@ -40,7 +40,7 @@ public class UnsafeBufferTests /** * Test that verifies the atomicity of the {@link UnsafeBuffer#putLongVolatile(int, long)}, - * {@link UnsafeBuffer#putLongOrdered(int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}. + * {@link UnsafeBuffer#putLongRelease(int, long)} (int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}. */ @JCStressTest @Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes") @@ -71,7 +71,7 @@ public void putLongVolatile() @Actor public void putLongOrdered() { - buffer.putLongOrdered(WRITE_INDEX, Long.MAX_VALUE - 1); + buffer.putLongRelease(WRITE_INDEX, Long.MAX_VALUE - 1); } /** @@ -88,7 +88,7 @@ public void actor2(final J_Result result) /** * Test that verifies the atomicity of the {@link UnsafeBuffer#putIntVolatile(int, int)}, - * {@link UnsafeBuffer#putIntOrdered(int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}. + * {@link UnsafeBuffer#putIntRelease(int, int)} (int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}. */ @JCStressTest @Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes") @@ -119,7 +119,7 @@ public void putIntVolatile() @Actor public void putIntOrdered() { - buffer.putIntOrdered(WRITE_INDEX, 222222222); + buffer.putIntRelease(WRITE_INDEX, 222222222); } /** diff --git a/agrona/src/main/java/org/agrona/MarkFile.java b/agrona/src/main/java/org/agrona/MarkFile.java index bcf1a70d6..6d31b18fb 100644 --- a/agrona/src/main/java/org/agrona/MarkFile.java +++ b/agrona/src/main/java/org/agrona/MarkFile.java @@ -259,7 +259,7 @@ public void close() */ public void signalReady(final int version) { - buffer.putIntOrdered(versionFieldOffset, version); + buffer.putIntRelease(versionFieldOffset, version); } /** @@ -289,7 +289,7 @@ public int versionWeak() */ public void timestampOrdered(final long timestamp) { - buffer.putLongOrdered(timestampFieldOffset, timestamp); + buffer.putLongRelease(timestampFieldOffset, timestamp); } /** diff --git a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java index d2b61910a..ede9f6083 100644 --- a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java @@ -22,6 +22,13 @@ /** * Abstraction over a range of buffer types that allows type to be accessed with various memory ordering semantics. + *

+ * Before Java 9, there was no standard for stores with release semantics. On the AtomicLong there was the + * {@link java.util.concurrent.atomic.AtomicLong#lazySet(long)}. Because there was no standard, the AtomicBuffer + * has methods like {@link #putLongOrdered(int, long)}. With Java 9, the 'release' name has been introduced. + * The AtomicBuffer also has methods with release methods which are identical to the ordered methods. All the + * methods with 'ordered' semantics will call the equivalent method with release semantics. This introduces a small + * performance penalty and should encourage users to switch to the newer methods. */ public interface AtomicBuffer extends MutableDirectBuffer { @@ -53,8 +60,8 @@ public interface AtomicBuffer extends MutableDirectBuffer * Verify that the underlying buffer is correctly aligned to prevent word tearing, other ordering issues and * the JVM crashes. In particular this method verifies that the starting offset of the underlying buffer is properly * aligned. However, the actual atomic call must ensure that the index is properly aligned, i.e. it must be aligned - * to the size of the operand. For example a call to any of the following methods {@link #putIntOrdered(int, int)}, - * {@link #putIntVolatile(int, int)}, {@link #addIntOrdered(int, int)}, {@link #getIntVolatile(int)}, + * to the size of the operand. For example a call to any of the following methods {@link #putIntRelease(int, int)}, + * {@link #putIntVolatile(int, int)}, {@link #addIntRelease(int, int)} (int, int)}, {@link #getIntVolatile(int)}, * {@link #getAndAddInt(int, int)} or {@link #getAndSetInt(int, int)}, must have the index aligned by four bytes * (e.g. {@code 0, 4, 8, 12, 60 etc.}). *

@@ -88,6 +95,14 @@ public interface AtomicBuffer extends MutableDirectBuffer */ long getLongVolatile(int index); + /** + * Atomically get the value at a given index with acquire semantics. + * + * @param index in bytes from which to get. + * @return the value for at a given index. + */ + long getLongAcquire(int index); + /** * Atomically put a value to a given index with volatile semantics. *

@@ -101,17 +116,29 @@ public interface AtomicBuffer extends MutableDirectBuffer /** * Atomically put a value to a given index with ordered store semantics. *

- * This call has release semantics. + * Instead of using this method, use {@link #putLongRelease(int, long)} instead. They + * are identical and the putLongRelease is the preferred version. * * @param index in bytes for where to put. * @param value for at a given index. */ void putLongOrdered(int index, long value); + /** + * Atomically put a value to a given index with release semantics. + * + * @param index in bytes for where to put. + * @param value for at a given index. + */ + void putLongRelease(int index, long value); + /** * Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement. *

* The load has no ordering semantics. The store has release semantics. + *

+ * Instead of using this method, use {@link #addLongRelease(int, long)} instead. They + * are identical but the addLongRelease is the preferred version. * * @param index in bytes for where to put. * @param increment by which the value at the index will be adjusted. @@ -119,6 +146,17 @@ public interface AtomicBuffer extends MutableDirectBuffer */ long addLongOrdered(int index, long increment); + /** + * Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement. + *

+ * The load has no ordering semantics. The store has release semantics. + * + * @param index in bytes for where to put. + * @param increment by which the value at the index will be adjusted. + * @return the previous value at the index. + */ + long addLongRelease(int index, long increment); + /** * Atomic compare and set of a long given an expected value. *

@@ -174,20 +212,40 @@ public interface AtomicBuffer extends MutableDirectBuffer */ void putIntVolatile(int index, int value); + /** + * Atomically get the value at a given index with acquire semantics. + * + * @param index in bytes from which to get. + * @return the value for at a given index. + */ + int getIntAcquire(int index); + /** * Atomically put a value to a given index with ordered semantics. *

- * This call has release semantics. - * + * Instead of using this method, use {@link #putIntRelease} instead. They + * are identical but the putIntRelease is the preferred version. + * @param index in bytes for where to put. * @param value for at a given index. */ void putIntOrdered(int index, int value); + /** + * Atomically put a value to a given index with release semantics. + * + * @param index in bytes for where to put. + * @param value for at a given index. + */ + void putIntRelease(int index, int value); + /** * Atomically add a value to a given index with ordered store semantics. Use a negative increment to decrement. *

* The load has no ordering semantics. The store has release semantics. + *

+ * Instead of using this method, use {@link #addIntRelease(int, int)} instead. They + * are identical but the addIntRelease is the preferred version. * * @param index in bytes for where to put. * @param increment by which the value at the index will be adjusted. @@ -195,6 +253,17 @@ public interface AtomicBuffer extends MutableDirectBuffer */ int addIntOrdered(int index, int increment); + /** + * Atomically add a value to a given index with release semantics. Use a negative increment to decrement. + *

+ * The load has no ordering semantics. The store has release semantics. + * + * @param index in bytes for where to put. + * @param increment by which the value at the index will be adjusted. + * @return the previous value at the index. + */ + int addIntRelease(int index, int increment); + /** * Atomic compare and set of an int given an expected value. *

diff --git a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java index cf2a9f7b6..355c61d4c 100644 --- a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java @@ -398,10 +398,31 @@ public void putLongVolatile(final int index, final long value) UnsafeApi.putLongVolatile(byteArray, addressOffset + index, value); } + /** + * {@inheritDoc} + */ + public long getLongAcquire(final int index) + { + if (SHOULD_BOUNDS_CHECK) + { + boundsCheck0(index, SIZE_OF_LONG); + } + + return UnsafeApi.getLongAcquire(byteArray, addressOffset + index); + } + /** * {@inheritDoc} */ public void putLongOrdered(final int index, final long value) + { + putLongRelease(index, value); + } + + /** + * {@inheritDoc} + */ + public void putLongRelease(final int index, final long value) { if (SHOULD_BOUNDS_CHECK) { @@ -415,6 +436,14 @@ public void putLongOrdered(final int index, final long value) * {@inheritDoc} */ public long addLongOrdered(final int index, final long increment) + { + return addLongRelease(index, increment); + } + + /** + * {@inheritDoc} + */ + public long addLongRelease(final int index, final long increment) { if (SHOULD_BOUNDS_CHECK) { @@ -489,10 +518,31 @@ public void putIntVolatile(final int index, final int value) UnsafeApi.putIntVolatile(byteArray, addressOffset + index, value); } + /** + * {@inheritDoc} + */ + public int getIntAcquire(final int index) + { + if (SHOULD_BOUNDS_CHECK) + { + boundsCheck0(index, SIZE_OF_INT); + } + + return UnsafeApi.getIntAcquire(byteArray, addressOffset + index); + } + /** * {@inheritDoc} */ public void putIntOrdered(final int index, final int value) + { + putIntRelease(index, value); + } + + /** + * {@inheritDoc} + */ + public void putIntRelease(final int index, final int value) { if (SHOULD_BOUNDS_CHECK) { @@ -506,6 +556,14 @@ public void putIntOrdered(final int index, final int value) * {@inheritDoc} */ public int addIntOrdered(final int index, final int increment) + { + return addIntRelease(index, increment); + } + + /** + * {@inheritDoc} + */ + public int addIntRelease(final int index, final int increment) { if (SHOULD_BOUNDS_CHECK) { diff --git a/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java b/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java index 06e5d5ead..c82d50a0c 100644 --- a/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java +++ b/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java @@ -122,13 +122,13 @@ public void transmit(final int msgTypeId, final DirectBuffer srcBuffer, final in buffer.putBytes(msgOffset(recordOffset), srcBuffer, srcIndex, length); - buffer.putLongOrdered(latestCounterIndex, currentTail); - buffer.putLongOrdered(tailCounterIndex, currentTail + recordLengthAligned); + buffer.putLongRelease(latestCounterIndex, currentTail); + buffer.putLongRelease(tailCounterIndex, currentTail + recordLengthAligned); } private void signalTailIntent(final AtomicBuffer buffer, final long newTail) { - buffer.putLongOrdered(tailIntentCountIndex, newTail); + buffer.putLongRelease(tailIntentCountIndex, newTail); VarHandle.releaseFence(); } diff --git a/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java b/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java index fdd521a74..3002d790b 100644 --- a/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java +++ b/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java @@ -177,7 +177,7 @@ public boolean record(final Throwable observation) final int offset = distinctObservation.offset; buffer.getAndAddInt(offset + OBSERVATION_COUNT_OFFSET, 1); - buffer.putLongOrdered(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs); + buffer.putLongRelease(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs); return true; } @@ -279,7 +279,7 @@ private DistinctObservation newObservation(final long timestampMs, final Throwab final DistinctObservation distinctObservation = new DistinctObservation(observation, offset); distinctObservations = prepend(distinctObservations, distinctObservation); - buffer.putIntOrdered(offset + LENGTH_OFFSET, length); + buffer.putIntRelease(offset + LENGTH_OFFSET, length); return distinctObservation; } diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java index edc51f72e..7ca2a65ae 100644 --- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java @@ -97,12 +97,12 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in return false; } - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); VarHandle.releaseFence(); buffer.putBytes(encodedMsgOffset(recordIndex), srcBuffer, offset, length); buffer.putInt(typeOffset(recordIndex), msgTypeId); - buffer.putIntOrdered(lengthOffset(recordIndex), recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), recordLength); return true; } @@ -124,7 +124,7 @@ public int tryClaim(final int msgTypeId, final int length) return recordIndex; } - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); VarHandle.releaseFence(); buffer.putInt(typeOffset(recordIndex), msgTypeId); @@ -140,7 +140,7 @@ public void commit(final int index) final AtomicBuffer buffer = this.buffer; final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); } /** @@ -153,7 +153,7 @@ public void abort(final int index) final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); } /** @@ -208,7 +208,7 @@ public int read(final MessageHandler handler, final int messageCountLimit) if (bytesRead > 0) { buffer.setMemory(headIndex, bytesRead, (byte)0); - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); } } @@ -277,7 +277,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess if (COMMIT == action) { buffer.setMemory(headIndex, bytesRead, (byte)0); - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); headIndex += bytesRead; head += bytesRead; bytesRead = 0; @@ -289,7 +289,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess if (bytesRead > 0) { buffer.setMemory(headIndex, bytesRead, (byte)0); - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); } } @@ -325,7 +325,7 @@ public AtomicBuffer buffer() */ public void consumerHeartbeatTime(final long time) { - buffer.putLongOrdered(consumerHeartbeatIndex, time); + buffer.putLongRelease(consumerHeartbeatIndex, time); } /** @@ -408,7 +408,7 @@ public boolean unblock() if (length < 0) { buffer.putInt(typeOffset(consumerIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(consumerIndex), -length); + buffer.putIntRelease(lengthOffset(consumerIndex), -length); unblocked = true; } else if (0 == length) @@ -426,7 +426,7 @@ else if (0 == length) if (scanBackToConfirmStillZeroed(buffer, i, consumerIndex)) { buffer.putInt(typeOffset(consumerIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(consumerIndex), i - consumerIndex); + buffer.putIntRelease(lengthOffset(consumerIndex), i - consumerIndex); unblocked = true; } @@ -501,7 +501,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength) return INSUFFICIENT_CAPACITY; } - buffer.putLongOrdered(headCachePositionIndex, head); + buffer.putLongRelease(headCachePositionIndex, head); } newTail = tail + requiredCapacity; @@ -525,7 +525,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength) newTail = tail; // Do not claim any actual space, only pad to the buffer end } - buffer.putLongOrdered(headCachePositionIndex, head); + buffer.putLongRelease(headCachePositionIndex, head); } padding = toBufferEndLength; @@ -536,11 +536,11 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength) if (0 != padding) { - buffer.putIntOrdered(lengthOffset(tailIndex), -padding); + buffer.putIntRelease(lengthOffset(tailIndex), -padding); VarHandle.releaseFence(); buffer.putInt(typeOffset(tailIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(tailIndex), padding); + buffer.putIntRelease(lengthOffset(tailIndex), padding); } return writeIndex; diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java index 2425e0ffc..e83d86789 100644 --- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java @@ -100,12 +100,12 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in return false; } - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); VarHandle.releaseFence(); buffer.putBytes(encodedMsgOffset(recordIndex), srcBuffer, offset, length); buffer.putInt(typeOffset(recordIndex), msgTypeId); - buffer.putIntOrdered(lengthOffset(recordIndex), recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), recordLength); return true; } @@ -127,7 +127,7 @@ public int tryClaim(final int msgTypeId, final int length) return recordIndex; } - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); VarHandle.releaseFence(); buffer.putInt(typeOffset(recordIndex), msgTypeId); @@ -143,7 +143,7 @@ public void commit(final int index) final AtomicBuffer buffer = this.buffer; final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); } /** @@ -156,7 +156,7 @@ public void abort(final int index) final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + buffer.putIntRelease(lengthOffset(recordIndex), -recordLength); } /** @@ -211,7 +211,7 @@ public int read(final MessageHandler handler, final int messageCountLimit) { if (bytesRead > 0) { - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); } } @@ -280,7 +280,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess } if (COMMIT == action) { - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); headIndex += bytesRead; head += bytesRead; bytesRead = 0; @@ -291,7 +291,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess { if (bytesRead > 0) { - buffer.putLongOrdered(headPositionIndex, head + bytesRead); + buffer.putLongRelease(headPositionIndex, head + bytesRead); } } @@ -327,7 +327,7 @@ public AtomicBuffer buffer() */ public void consumerHeartbeatTime(final long time) { - buffer.putLongOrdered(consumerHeartbeatIndex, time); + buffer.putLongRelease(consumerHeartbeatIndex, time); } /** @@ -441,7 +441,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength) if (alignedRecordLength == toBufferEndLength) // message fits within the end of the buffer { - buffer.putLongOrdered(tailPositionIndex, nextTail); + buffer.putLongRelease(tailPositionIndex, nextTail); buffer.putLong(0, 0L); // pre-zero next message header return recordIndex; } @@ -467,16 +467,16 @@ else if (requiredCapacity > toBufferEndLength) nextTail += padding; } - buffer.putLongOrdered(tailPositionIndex, nextTail); + buffer.putLongRelease(tailPositionIndex, nextTail); if (0 != padding) { buffer.putLong(0, 0L); - buffer.putIntOrdered(lengthOffset(recordIndex), -padding); + buffer.putIntRelease(lengthOffset(recordIndex), -padding); VarHandle.releaseFence(); buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(recordIndex), padding); + buffer.putIntRelease(lengthOffset(recordIndex), padding); } if (INSUFFICIENT_CAPACITY != writeIndex) diff --git a/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java b/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java index 3e75333b5..aec0dda0e 100644 --- a/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java +++ b/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java @@ -209,7 +209,7 @@ public int allocate(final String label, final int typeId) metaDataBuffer.putLong(recordOffset + FREE_FOR_REUSE_DEADLINE_OFFSET, NOT_FREE_TO_REUSE); putLabel(recordOffset, label); - metaDataBuffer.putIntOrdered(recordOffset, RECORD_ALLOCATED); + metaDataBuffer.putIntRelease(recordOffset, RECORD_ALLOCATED); } catch (final Exception ex) { @@ -244,7 +244,7 @@ public int allocate(final String label, final int typeId, final Consumer