Skip to content

Commit

Permalink
AtomicBuffer acquire/release operations.
Browse files Browse the repository at this point in the history
The AtomicBuffer has release method for every ordered method. So for
e.g. a AtomicBUffer.putLongOrdered, there is a putLongRelease.

The ordered method will call the release version, so there is a sligth
performance penalty.

Also acquire get methods have been added that have slightly weaker
memory ordering semantics compared to a volatile read. Which could
provide more oppertunity for the JIT to do its magic and in theory
could give better performance on ISAs with weaker memory models.
  • Loading branch information
pveentjer committed Jan 16, 2025
1 parent d3cfe6a commit 15ede48
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,16 @@ private void testAlignedAtomicMethods(final AtomicBuffer buffer, final int offse
buffer.compareAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE, Long.MAX_VALUE);
buffer.getAndAddLong(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.getAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.putLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.addLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.putLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.addLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE);

buffer.getIntVolatile(offset + SIZE_OF_INT);
buffer.putIntVolatile(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.compareAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE, Integer.MAX_VALUE);
buffer.getAndAddInt(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.getAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.putIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.addIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.putIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.addIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE);

buffer.getShortVolatile(offset + SIZE_OF_SHORT);
buffer.putShortVolatile(offset + SIZE_OF_SHORT, Short.MAX_VALUE);
Expand All @@ -319,17 +319,17 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off
addressOffset, offset + SIZE_OF_INT, (i) -> buffer.compareAndSetLong(i, Long.MAX_VALUE, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndAddLong(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndSetLong(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongOrdered(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongOrdered(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongRelease(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongRelease(i, Long.MAX_VALUE));

assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, buffer::getIntVolatile);
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntVolatile(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT,
(i) -> buffer.compareAndSetInt(i, Integer.MAX_VALUE, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndAddInt(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndSetInt(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntOrdered(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntOrdered(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntRelease(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntRelease(i, Integer.MAX_VALUE));

assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, buffer::getShortVolatile);
assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, (i) -> buffer.putShortVolatile(i, Short.MAX_VALUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public class UnsafeBufferTests

/**
* Test that verifies the atomicity of the {@link UnsafeBuffer#putLongVolatile(int, long)},
* {@link UnsafeBuffer#putLongOrdered(int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}.
* {@link UnsafeBuffer#putLongRelease(int, long)} (int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}.
*/
@JCStressTest
@Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes")
@Outcome(id = "-1", expect = Expect.ACCEPTABLE, desc = "putLongVolatile before read")
@Outcome(id = "9223372036854775806", expect = Expect.ACCEPTABLE, desc = "putLongOrdered before read")
@Outcome(id = "9223372036854775806", expect = Expect.ACCEPTABLE, desc = "putLongRelease before read")
@State
public static class DirectBufferLong
{
Expand All @@ -69,9 +69,9 @@ public void putLongVolatile()
* Writer thread.
*/
@Actor
public void putLongOrdered()
public void putLongRelease()
{
buffer.putLongOrdered(WRITE_INDEX, Long.MAX_VALUE - 1);
buffer.putLongRelease(WRITE_INDEX, Long.MAX_VALUE - 1);
}

/**
Expand All @@ -88,7 +88,7 @@ public void actor2(final J_Result result)

/**
* Test that verifies the atomicity of the {@link UnsafeBuffer#putIntVolatile(int, int)},
* {@link UnsafeBuffer#putIntOrdered(int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}.
* {@link UnsafeBuffer#putIntRelease(int, int)} (int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}.
*/
@JCStressTest
@Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes")
Expand Down Expand Up @@ -117,9 +117,9 @@ public void putIntVolatile()
* Writer thread.
*/
@Actor
public void putIntOrdered()
public void putIntRelease()
{
buffer.putIntOrdered(WRITE_INDEX, 222222222);
buffer.putIntRelease(WRITE_INDEX, 222222222);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions agrona/src/main/java/org/agrona/MarkFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void close()
*/
public void signalReady(final int version)
{
buffer.putIntOrdered(versionFieldOffset, version);
buffer.putIntRelease(versionFieldOffset, version);
}

/**
Expand Down Expand Up @@ -289,7 +289,7 @@ public int versionWeak()
*/
public void timestampOrdered(final long timestamp)
{
buffer.putLongOrdered(timestampFieldOffset, timestamp);
buffer.putLongRelease(timestampFieldOffset, timestamp);
}

/**
Expand Down
79 changes: 74 additions & 5 deletions agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@

/**
* Abstraction over a range of buffer types that allows type to be accessed with various memory ordering semantics.
* <p>
* Before Java 9, there was no standard for stores with release semantics. On the AtomicLong there was the
* {@link java.util.concurrent.atomic.AtomicLong#lazySet(long)}. Because there was no standard, the AtomicBuffer
* has methods like {@link #putLongOrdered(int, long)}. With Java 9, the 'release' name has been introduced.
* The AtomicBuffer also has methods with release methods which are identical to the ordered methods. All the
* methods with 'ordered' semantics will call the equivalent method with release semantics. This introduces a small
* performance penalty and should encourage users to switch to the newer methods.
*/
public interface AtomicBuffer extends MutableDirectBuffer
{
Expand Down Expand Up @@ -53,8 +60,8 @@ public interface AtomicBuffer extends MutableDirectBuffer
* Verify that the underlying buffer is correctly aligned to prevent word tearing, other ordering issues and
* the JVM crashes. In particular this method verifies that the starting offset of the underlying buffer is properly
* aligned. However, the actual atomic call must ensure that the index is properly aligned, i.e. it must be aligned
* to the size of the operand. For example a call to any of the following methods {@link #putIntOrdered(int, int)},
* {@link #putIntVolatile(int, int)}, {@link #addIntOrdered(int, int)}, {@link #getIntVolatile(int)},
* to the size of the operand. For example a call to any of the following methods {@link #putIntRelease(int, int)},
* {@link #putIntVolatile(int, int)}, {@link #addIntRelease(int, int)} (int, int)}, {@link #getIntVolatile(int)},
* {@link #getAndAddInt(int, int)} or {@link #getAndSetInt(int, int)}, must have the index aligned by four bytes
* (e.g. {@code 0, 4, 8, 12, 60 etc.}).
* <p>
Expand Down Expand Up @@ -88,6 +95,14 @@ public interface AtomicBuffer extends MutableDirectBuffer
*/
long getLongVolatile(int index);

/**
* Atomically get the value at a given index with acquire semantics.
*
* @param index in bytes from which to get.
* @return the value for at a given index.
*/
long getLongAcquire(int index);

/**
* Atomically put a value to a given index with volatile semantics.
* <p>
Expand All @@ -101,24 +116,47 @@ public interface AtomicBuffer extends MutableDirectBuffer
/**
* Atomically put a value to a given index with ordered store semantics.
* <p>
* This call has release semantics.
* Instead of using this method, use {@link #putLongRelease(int, long)} instead. They
* are identical and the putLongRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putLongOrdered(int index, long value);

/**
* Atomically put a value to a given index with release semantics.
*
* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putLongRelease(int index, long value);

/**
* Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
* <p>
* Instead of using this method, use {@link #addLongRelease(int, long)} instead. They
* are identical but the addLongRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
long addLongOrdered(int index, long increment);

/**
* Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
long addLongRelease(int index, long increment);

/**
* Atomic compare and set of a long given an expected value.
* <p>
Expand Down Expand Up @@ -174,27 +212,58 @@ public interface AtomicBuffer extends MutableDirectBuffer
*/
void putIntVolatile(int index, int value);

/**
* Atomically get the value at a given index with acquire semantics.
*
* @param index in bytes from which to get.
* @return the value for at a given index.
*/
int getIntAcquire(int index);

/**
* Atomically put a value to a given index with ordered semantics.
* <p>
* This call has release semantics.
*
* Instead of using this method, use {@link #putIntRelease} instead. They
* are identical but the putIntRelease is the preferred version.
* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putIntOrdered(int index, int value);

/**
* Atomically put a value to a given index with release semantics.
*
* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putIntRelease(int index, int value);

/**
* Atomically add a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
* <p>
* Instead of using this method, use {@link #addIntRelease(int, int)} instead. They
* are identical but the addIntRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
int addIntOrdered(int index, int increment);

/**
* Atomically add a value to a given index with release semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
int addIntRelease(int index, int increment);

/**
* Atomic compare and set of an int given an expected value.
* <p>
Expand Down
58 changes: 58 additions & 0 deletions agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,31 @@ public void putLongVolatile(final int index, final long value)
UnsafeApi.putLongVolatile(byteArray, addressOffset + index, value);
}

/**
* {@inheritDoc}
*/
public long getLongAcquire(final int index)
{
if (SHOULD_BOUNDS_CHECK)
{
boundsCheck0(index, SIZE_OF_LONG);
}

return UnsafeApi.getLongAcquire(byteArray, addressOffset + index);
}

/**
* {@inheritDoc}
*/
public void putLongOrdered(final int index, final long value)
{
putLongRelease(index, value);
}

/**
* {@inheritDoc}
*/
public void putLongRelease(final int index, final long value)
{
if (SHOULD_BOUNDS_CHECK)
{
Expand All @@ -415,6 +436,14 @@ public void putLongOrdered(final int index, final long value)
* {@inheritDoc}
*/
public long addLongOrdered(final int index, final long increment)
{
return addLongRelease(index, increment);
}

/**
* {@inheritDoc}
*/
public long addLongRelease(final int index, final long increment)
{
if (SHOULD_BOUNDS_CHECK)
{
Expand Down Expand Up @@ -489,10 +518,31 @@ public void putIntVolatile(final int index, final int value)
UnsafeApi.putIntVolatile(byteArray, addressOffset + index, value);
}

/**
* {@inheritDoc}
*/
public int getIntAcquire(final int index)
{
if (SHOULD_BOUNDS_CHECK)
{
boundsCheck0(index, SIZE_OF_INT);
}

return UnsafeApi.getIntAcquire(byteArray, addressOffset + index);
}

/**
* {@inheritDoc}
*/
public void putIntOrdered(final int index, final int value)
{
putIntRelease(index, value);
}

/**
* {@inheritDoc}
*/
public void putIntRelease(final int index, final int value)
{
if (SHOULD_BOUNDS_CHECK)
{
Expand All @@ -506,6 +556,14 @@ public void putIntOrdered(final int index, final int value)
* {@inheritDoc}
*/
public int addIntOrdered(final int index, final int increment)
{
return addIntRelease(index, increment);
}

/**
* {@inheritDoc}
*/
public int addIntRelease(final int index, final int increment)
{
if (SHOULD_BOUNDS_CHECK)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public void transmit(final int msgTypeId, final DirectBuffer srcBuffer, final in

buffer.putBytes(msgOffset(recordOffset), srcBuffer, srcIndex, length);

buffer.putLongOrdered(latestCounterIndex, currentTail);
buffer.putLongOrdered(tailCounterIndex, currentTail + recordLengthAligned);
buffer.putLongRelease(latestCounterIndex, currentTail);
buffer.putLongRelease(tailCounterIndex, currentTail + recordLengthAligned);
}

private void signalTailIntent(final AtomicBuffer buffer, final long newTail)
{
buffer.putLongOrdered(tailIntentCountIndex, newTail);
buffer.putLongRelease(tailIntentCountIndex, newTail);
VarHandle.releaseFence();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public boolean record(final Throwable observation)

final int offset = distinctObservation.offset;
buffer.getAndAddInt(offset + OBSERVATION_COUNT_OFFSET, 1);
buffer.putLongOrdered(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs);
buffer.putLongRelease(offset + LAST_OBSERVATION_TIMESTAMP_OFFSET, timestampMs);

return true;
}
Expand Down Expand Up @@ -279,7 +279,7 @@ private DistinctObservation newObservation(final long timestampMs, final Throwab

final DistinctObservation distinctObservation = new DistinctObservation(observation, offset);
distinctObservations = prepend(distinctObservations, distinctObservation);
buffer.putIntOrdered(offset + LENGTH_OFFSET, length);
buffer.putIntRelease(offset + LENGTH_OFFSET, length);

return distinctObservation;
}
Expand Down
Loading

0 comments on commit 15ede48

Please sign in to comment.