diff --git a/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java b/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java index eab323e5a..e365a0b90 100644 --- a/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java +++ b/agrona-agent/src/test/java/org/agrona/agent/BufferAlignmentAgentTest.java @@ -284,16 +284,16 @@ private void testAlignedAtomicMethods(final AtomicBuffer buffer, final int offse buffer.compareAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE, Long.MAX_VALUE); buffer.getAndAddLong(offset + SIZE_OF_LONG, Long.MAX_VALUE); buffer.getAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE); - buffer.putLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE); - buffer.addLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE); + buffer.putLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE); + buffer.addLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE); buffer.getIntVolatile(offset + SIZE_OF_INT); buffer.putIntVolatile(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.compareAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE, Integer.MAX_VALUE); buffer.getAndAddInt(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.getAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE); - buffer.putIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE); - buffer.addIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE); + buffer.putIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE); + buffer.addIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE); buffer.getShortVolatile(offset + SIZE_OF_SHORT); buffer.putShortVolatile(offset + SIZE_OF_SHORT, Short.MAX_VALUE); @@ -319,8 +319,8 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off addressOffset, offset + SIZE_OF_INT, (i) -> buffer.compareAndSetLong(i, Long.MAX_VALUE, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndAddLong(i, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndSetLong(i, Long.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongOrdered(i, Long.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongOrdered(i, Long.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongRelease(i, Long.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongRelease(i, Long.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, buffer::getIntVolatile); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntVolatile(i, Integer.MAX_VALUE)); @@ -328,8 +328,8 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off (i) -> buffer.compareAndSetInt(i, Integer.MAX_VALUE, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndAddInt(i, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndSetInt(i, Integer.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntOrdered(i, Integer.MAX_VALUE)); - assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntOrdered(i, Integer.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntRelease(i, Integer.MAX_VALUE)); + assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntRelease(i, Integer.MAX_VALUE)); assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, buffer::getShortVolatile); assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, (i) -> buffer.putShortVolatile(i, Short.MAX_VALUE)); diff --git a/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java b/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java index 65ad93dbf..47b9175b4 100644 --- a/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java +++ b/agrona-concurrency-tests/src/main/java/org/agrona/concurrent/UnsafeBufferTests.java @@ -40,7 +40,7 @@ public class UnsafeBufferTests /** * Test that verifies the atomicity of the {@link UnsafeBuffer#putLongVolatile(int, long)}, - * {@link UnsafeBuffer#putLongOrdered(int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}. + * {@link UnsafeBuffer#putLongRelease(int, long)} (int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}. */ @JCStressTest @Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes") @@ -71,7 +71,7 @@ public void putLongVolatile() @Actor public void putLongOrdered() { - buffer.putLongOrdered(WRITE_INDEX, Long.MAX_VALUE - 1); + buffer.putLongRelease(WRITE_INDEX, Long.MAX_VALUE - 1); } /** @@ -88,7 +88,7 @@ public void actor2(final J_Result result) /** * Test that verifies the atomicity of the {@link UnsafeBuffer#putIntVolatile(int, int)}, - * {@link UnsafeBuffer#putIntOrdered(int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}. + * {@link UnsafeBuffer#putIntRelease(int, int)} (int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}. */ @JCStressTest @Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes") @@ -119,7 +119,7 @@ public void putIntVolatile() @Actor public void putIntOrdered() { - buffer.putIntOrdered(WRITE_INDEX, 222222222); + buffer.putIntRelease(WRITE_INDEX, 222222222); } /** diff --git a/agrona/src/main/java/org/agrona/MarkFile.java b/agrona/src/main/java/org/agrona/MarkFile.java index bcf1a70d6..6d31b18fb 100644 --- a/agrona/src/main/java/org/agrona/MarkFile.java +++ b/agrona/src/main/java/org/agrona/MarkFile.java @@ -259,7 +259,7 @@ public void close() */ public void signalReady(final int version) { - buffer.putIntOrdered(versionFieldOffset, version); + buffer.putIntRelease(versionFieldOffset, version); } /** @@ -289,7 +289,7 @@ public int versionWeak() */ public void timestampOrdered(final long timestamp) { - buffer.putLongOrdered(timestampFieldOffset, timestamp); + buffer.putLongRelease(timestampFieldOffset, timestamp); } /** diff --git a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java index d2b61910a..ede9f6083 100644 --- a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java @@ -22,6 +22,13 @@ /** * Abstraction over a range of buffer types that allows type to be accessed with various memory ordering semantics. + *
+ * Before Java 9, there was no standard for stores with release semantics. On the AtomicLong there was the + * {@link java.util.concurrent.atomic.AtomicLong#lazySet(long)}. Because there was no standard, the AtomicBuffer + * has methods like {@link #putLongOrdered(int, long)}. With Java 9, the 'release' name has been introduced. + * The AtomicBuffer also has methods with release methods which are identical to the ordered methods. All the + * methods with 'ordered' semantics will call the equivalent method with release semantics. This introduces a small + * performance penalty and should encourage users to switch to the newer methods. */ public interface AtomicBuffer extends MutableDirectBuffer { @@ -53,8 +60,8 @@ public interface AtomicBuffer extends MutableDirectBuffer * Verify that the underlying buffer is correctly aligned to prevent word tearing, other ordering issues and * the JVM crashes. In particular this method verifies that the starting offset of the underlying buffer is properly * aligned. However, the actual atomic call must ensure that the index is properly aligned, i.e. it must be aligned - * to the size of the operand. For example a call to any of the following methods {@link #putIntOrdered(int, int)}, - * {@link #putIntVolatile(int, int)}, {@link #addIntOrdered(int, int)}, {@link #getIntVolatile(int)}, + * to the size of the operand. For example a call to any of the following methods {@link #putIntRelease(int, int)}, + * {@link #putIntVolatile(int, int)}, {@link #addIntRelease(int, int)} (int, int)}, {@link #getIntVolatile(int)}, * {@link #getAndAddInt(int, int)} or {@link #getAndSetInt(int, int)}, must have the index aligned by four bytes * (e.g. {@code 0, 4, 8, 12, 60 etc.}). *
@@ -88,6 +95,14 @@ public interface AtomicBuffer extends MutableDirectBuffer */ long getLongVolatile(int index); + /** + * Atomically get the value at a given index with acquire semantics. + * + * @param index in bytes from which to get. + * @return the value for at a given index. + */ + long getLongAcquire(int index); + /** * Atomically put a value to a given index with volatile semantics. *
@@ -101,17 +116,29 @@ public interface AtomicBuffer extends MutableDirectBuffer /** * Atomically put a value to a given index with ordered store semantics. *
- * This call has release semantics. + * Instead of using this method, use {@link #putLongRelease(int, long)} instead. They + * are identical and the putLongRelease is the preferred version. * * @param index in bytes for where to put. * @param value for at a given index. */ void putLongOrdered(int index, long value); + /** + * Atomically put a value to a given index with release semantics. + * + * @param index in bytes for where to put. + * @param value for at a given index. + */ + void putLongRelease(int index, long value); + /** * Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement. *
* The load has no ordering semantics. The store has release semantics. + *
+ * Instead of using this method, use {@link #addLongRelease(int, long)} instead. They + * are identical but the addLongRelease is the preferred version. * * @param index in bytes for where to put. * @param increment by which the value at the index will be adjusted. @@ -119,6 +146,17 @@ public interface AtomicBuffer extends MutableDirectBuffer */ long addLongOrdered(int index, long increment); + /** + * Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement. + *
+ * The load has no ordering semantics. The store has release semantics. + * + * @param index in bytes for where to put. + * @param increment by which the value at the index will be adjusted. + * @return the previous value at the index. + */ + long addLongRelease(int index, long increment); + /** * Atomic compare and set of a long given an expected value. *
@@ -174,20 +212,40 @@ public interface AtomicBuffer extends MutableDirectBuffer */ void putIntVolatile(int index, int value); + /** + * Atomically get the value at a given index with acquire semantics. + * + * @param index in bytes from which to get. + * @return the value for at a given index. + */ + int getIntAcquire(int index); + /** * Atomically put a value to a given index with ordered semantics. *
- * This call has release semantics. - * + * Instead of using this method, use {@link #putIntRelease} instead. They + * are identical but the putIntRelease is the preferred version. + * @param index in bytes for where to put. * @param value for at a given index. */ void putIntOrdered(int index, int value); + /** + * Atomically put a value to a given index with release semantics. + * + * @param index in bytes for where to put. + * @param value for at a given index. + */ + void putIntRelease(int index, int value); + /** * Atomically add a value to a given index with ordered store semantics. Use a negative increment to decrement. *
* The load has no ordering semantics. The store has release semantics. + *
+ * Instead of using this method, use {@link #addIntRelease(int, int)} instead. They + * are identical but the addIntRelease is the preferred version. * * @param index in bytes for where to put. * @param increment by which the value at the index will be adjusted. @@ -195,6 +253,17 @@ public interface AtomicBuffer extends MutableDirectBuffer */ int addIntOrdered(int index, int increment); + /** + * Atomically add a value to a given index with release semantics. Use a negative increment to decrement. + *
+ * The load has no ordering semantics. The store has release semantics. + * + * @param index in bytes for where to put. + * @param increment by which the value at the index will be adjusted. + * @return the previous value at the index. + */ + int addIntRelease(int index, int increment); + /** * Atomic compare and set of an int given an expected value. *
diff --git a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java
index cf2a9f7b6..355c61d4c 100644
--- a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java
+++ b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java
@@ -398,10 +398,31 @@ public void putLongVolatile(final int index, final long value)
UnsafeApi.putLongVolatile(byteArray, addressOffset + index, value);
}
+ /**
+ * {@inheritDoc}
+ */
+ public long getLongAcquire(final int index)
+ {
+ if (SHOULD_BOUNDS_CHECK)
+ {
+ boundsCheck0(index, SIZE_OF_LONG);
+ }
+
+ return UnsafeApi.getLongAcquire(byteArray, addressOffset + index);
+ }
+
/**
* {@inheritDoc}
*/
public void putLongOrdered(final int index, final long value)
+ {
+ putLongRelease(index, value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void putLongRelease(final int index, final long value)
{
if (SHOULD_BOUNDS_CHECK)
{
@@ -415,6 +436,14 @@ public void putLongOrdered(final int index, final long value)
* {@inheritDoc}
*/
public long addLongOrdered(final int index, final long increment)
+ {
+ return addLongRelease(index, increment);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long addLongRelease(final int index, final long increment)
{
if (SHOULD_BOUNDS_CHECK)
{
@@ -489,10 +518,31 @@ public void putIntVolatile(final int index, final int value)
UnsafeApi.putIntVolatile(byteArray, addressOffset + index, value);
}
+ /**
+ * {@inheritDoc}
+ */
+ public int getIntAcquire(final int index)
+ {
+ if (SHOULD_BOUNDS_CHECK)
+ {
+ boundsCheck0(index, SIZE_OF_INT);
+ }
+
+ return UnsafeApi.getIntAcquire(byteArray, addressOffset + index);
+ }
+
/**
* {@inheritDoc}
*/
public void putIntOrdered(final int index, final int value)
+ {
+ putIntRelease(index, value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void putIntRelease(final int index, final int value)
{
if (SHOULD_BOUNDS_CHECK)
{
@@ -506,6 +556,14 @@ public void putIntOrdered(final int index, final int value)
* {@inheritDoc}
*/
public int addIntOrdered(final int index, final int increment)
+ {
+ return addIntRelease(index, increment);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int addIntRelease(final int index, final int increment)
{
if (SHOULD_BOUNDS_CHECK)
{
diff --git a/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java b/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java
index 06e5d5ead..c82d50a0c 100644
--- a/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java
+++ b/agrona/src/main/java/org/agrona/concurrent/broadcast/BroadcastTransmitter.java
@@ -122,13 +122,13 @@ public void transmit(final int msgTypeId, final DirectBuffer srcBuffer, final in
buffer.putBytes(msgOffset(recordOffset), srcBuffer, srcIndex, length);
- buffer.putLongOrdered(latestCounterIndex, currentTail);
- buffer.putLongOrdered(tailCounterIndex, currentTail + recordLengthAligned);
+ buffer.putLongRelease(latestCounterIndex, currentTail);
+ buffer.putLongRelease(tailCounterIndex, currentTail + recordLengthAligned);
}
private void signalTailIntent(final AtomicBuffer buffer, final long newTail)
{
- buffer.putLongOrdered(tailIntentCountIndex, newTail);
+ buffer.putLongRelease(tailIntentCountIndex, newTail);
VarHandle.releaseFence();
}
diff --git a/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java b/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java
index fdd521a74..3002d790b 100644
--- a/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java
+++ b/agrona/src/main/java/org/agrona/concurrent/errors/DistinctErrorLog.java
@@ -177,7 +177,7 @@ public boolean record(final Throwable observation)
final int offset = distinctObservation.offset;
buffer.getAndAddInt(offset + OBSERVATION_COUNT_OFFSET, 1);
- buffer.putLongOrdered(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs);
+ buffer.putLongRelease(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs);
return true;
}
@@ -279,7 +279,7 @@ private DistinctObservation newObservation(final long timestampMs, final Throwab
final DistinctObservation distinctObservation = new DistinctObservation(observation, offset);
distinctObservations = prepend(distinctObservations, distinctObservation);
- buffer.putIntOrdered(offset + LENGTH_OFFSET, length);
+ buffer.putIntRelease(offset + LENGTH_OFFSET, length);
return distinctObservation;
}
diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java
index edc51f72e..7ca2a65ae 100644
--- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java
+++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java
@@ -97,12 +97,12 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in
return false;
}
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
VarHandle.releaseFence();
buffer.putBytes(encodedMsgOffset(recordIndex), srcBuffer, offset, length);
buffer.putInt(typeOffset(recordIndex), msgTypeId);
- buffer.putIntOrdered(lengthOffset(recordIndex), recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), recordLength);
return true;
}
@@ -124,7 +124,7 @@ public int tryClaim(final int msgTypeId, final int length)
return recordIndex;
}
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
VarHandle.releaseFence();
buffer.putInt(typeOffset(recordIndex), msgTypeId);
@@ -140,7 +140,7 @@ public void commit(final int index)
final AtomicBuffer buffer = this.buffer;
final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex);
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
}
/**
@@ -153,7 +153,7 @@ public void abort(final int index)
final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex);
buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
}
/**
@@ -208,7 +208,7 @@ public int read(final MessageHandler handler, final int messageCountLimit)
if (bytesRead > 0)
{
buffer.setMemory(headIndex, bytesRead, (byte)0);
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
}
}
@@ -277,7 +277,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess
if (COMMIT == action)
{
buffer.setMemory(headIndex, bytesRead, (byte)0);
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
headIndex += bytesRead;
head += bytesRead;
bytesRead = 0;
@@ -289,7 +289,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess
if (bytesRead > 0)
{
buffer.setMemory(headIndex, bytesRead, (byte)0);
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
}
}
@@ -325,7 +325,7 @@ public AtomicBuffer buffer()
*/
public void consumerHeartbeatTime(final long time)
{
- buffer.putLongOrdered(consumerHeartbeatIndex, time);
+ buffer.putLongRelease(consumerHeartbeatIndex, time);
}
/**
@@ -408,7 +408,7 @@ public boolean unblock()
if (length < 0)
{
buffer.putInt(typeOffset(consumerIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(consumerIndex), -length);
+ buffer.putIntRelease(lengthOffset(consumerIndex), -length);
unblocked = true;
}
else if (0 == length)
@@ -426,7 +426,7 @@ else if (0 == length)
if (scanBackToConfirmStillZeroed(buffer, i, consumerIndex))
{
buffer.putInt(typeOffset(consumerIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(consumerIndex), i - consumerIndex);
+ buffer.putIntRelease(lengthOffset(consumerIndex), i - consumerIndex);
unblocked = true;
}
@@ -501,7 +501,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
return INSUFFICIENT_CAPACITY;
}
- buffer.putLongOrdered(headCachePositionIndex, head);
+ buffer.putLongRelease(headCachePositionIndex, head);
}
newTail = tail + requiredCapacity;
@@ -525,7 +525,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
newTail = tail; // Do not claim any actual space, only pad to the buffer end
}
- buffer.putLongOrdered(headCachePositionIndex, head);
+ buffer.putLongRelease(headCachePositionIndex, head);
}
padding = toBufferEndLength;
@@ -536,11 +536,11 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
if (0 != padding)
{
- buffer.putIntOrdered(lengthOffset(tailIndex), -padding);
+ buffer.putIntRelease(lengthOffset(tailIndex), -padding);
VarHandle.releaseFence();
buffer.putInt(typeOffset(tailIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(tailIndex), padding);
+ buffer.putIntRelease(lengthOffset(tailIndex), padding);
}
return writeIndex;
diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java
index 2425e0ffc..e83d86789 100644
--- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java
+++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java
@@ -100,12 +100,12 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in
return false;
}
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
VarHandle.releaseFence();
buffer.putBytes(encodedMsgOffset(recordIndex), srcBuffer, offset, length);
buffer.putInt(typeOffset(recordIndex), msgTypeId);
- buffer.putIntOrdered(lengthOffset(recordIndex), recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), recordLength);
return true;
}
@@ -127,7 +127,7 @@ public int tryClaim(final int msgTypeId, final int length)
return recordIndex;
}
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
VarHandle.releaseFence();
buffer.putInt(typeOffset(recordIndex), msgTypeId);
@@ -143,7 +143,7 @@ public void commit(final int index)
final AtomicBuffer buffer = this.buffer;
final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex);
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
}
/**
@@ -156,7 +156,7 @@ public void abort(final int index)
final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex);
buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
+ buffer.putIntRelease(lengthOffset(recordIndex), -recordLength);
}
/**
@@ -211,7 +211,7 @@ public int read(final MessageHandler handler, final int messageCountLimit)
{
if (bytesRead > 0)
{
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
}
}
@@ -280,7 +280,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess
}
if (COMMIT == action)
{
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
headIndex += bytesRead;
head += bytesRead;
bytesRead = 0;
@@ -291,7 +291,7 @@ public int controlledRead(final ControlledMessageHandler handler, final int mess
{
if (bytesRead > 0)
{
- buffer.putLongOrdered(headPositionIndex, head + bytesRead);
+ buffer.putLongRelease(headPositionIndex, head + bytesRead);
}
}
@@ -327,7 +327,7 @@ public AtomicBuffer buffer()
*/
public void consumerHeartbeatTime(final long time)
{
- buffer.putLongOrdered(consumerHeartbeatIndex, time);
+ buffer.putLongRelease(consumerHeartbeatIndex, time);
}
/**
@@ -441,7 +441,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
if (alignedRecordLength == toBufferEndLength) // message fits within the end of the buffer
{
- buffer.putLongOrdered(tailPositionIndex, nextTail);
+ buffer.putLongRelease(tailPositionIndex, nextTail);
buffer.putLong(0, 0L); // pre-zero next message header
return recordIndex;
}
@@ -467,16 +467,16 @@ else if (requiredCapacity > toBufferEndLength)
nextTail += padding;
}
- buffer.putLongOrdered(tailPositionIndex, nextTail);
+ buffer.putLongRelease(tailPositionIndex, nextTail);
if (0 != padding)
{
buffer.putLong(0, 0L);
- buffer.putIntOrdered(lengthOffset(recordIndex), -padding);
+ buffer.putIntRelease(lengthOffset(recordIndex), -padding);
VarHandle.releaseFence();
buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID);
- buffer.putIntOrdered(lengthOffset(recordIndex), padding);
+ buffer.putIntRelease(lengthOffset(recordIndex), padding);
}
if (INSUFFICIENT_CAPACITY != writeIndex)
diff --git a/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java b/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java
index 3e75333b5..aec0dda0e 100644
--- a/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java
+++ b/agrona/src/main/java/org/agrona/concurrent/status/CountersManager.java
@@ -209,7 +209,7 @@ public int allocate(final String label, final int typeId)
metaDataBuffer.putLong(recordOffset + FREE_FOR_REUSE_DEADLINE_OFFSET, NOT_FREE_TO_REUSE);
putLabel(recordOffset, label);
- metaDataBuffer.putIntOrdered(recordOffset, RECORD_ALLOCATED);
+ metaDataBuffer.putIntRelease(recordOffset, RECORD_ALLOCATED);
}
catch (final Exception ex)
{
@@ -244,7 +244,7 @@ public int allocate(final String label, final int typeId, final Consumer