Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AtomicBuffer acquire/release methods #314

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,16 @@ private void testAlignedAtomicMethods(final AtomicBuffer buffer, final int offse
buffer.compareAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE, Long.MAX_VALUE);
buffer.getAndAddLong(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.getAndSetLong(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.putLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.addLongOrdered(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.putLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE);
buffer.addLongRelease(offset + SIZE_OF_LONG, Long.MAX_VALUE);

buffer.getIntVolatile(offset + SIZE_OF_INT);
buffer.putIntVolatile(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.compareAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE, Integer.MAX_VALUE);
buffer.getAndAddInt(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.getAndSetInt(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.putIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.addIntOrdered(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.putIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE);
buffer.addIntRelease(offset + SIZE_OF_INT, Integer.MAX_VALUE);

buffer.getShortVolatile(offset + SIZE_OF_SHORT);
buffer.putShortVolatile(offset + SIZE_OF_SHORT, Short.MAX_VALUE);
Expand All @@ -319,17 +319,17 @@ private void testUnAlignedAtomicMethods(final AtomicBuffer buffer, final int off
addressOffset, offset + SIZE_OF_INT, (i) -> buffer.compareAndSetLong(i, Long.MAX_VALUE, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndAddLong(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.getAndSetLong(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongOrdered(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongOrdered(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.putLongRelease(i, Long.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_INT, (i) -> buffer.addLongRelease(i, Long.MAX_VALUE));

assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, buffer::getIntVolatile);
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntVolatile(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT,
(i) -> buffer.compareAndSetInt(i, Integer.MAX_VALUE, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndAddInt(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.getAndSetInt(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntOrdered(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntOrdered(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.putIntRelease(i, Integer.MAX_VALUE));
assertUnaligned(addressOffset, offset + SIZE_OF_SHORT, (i) -> buffer.addIntRelease(i, Integer.MAX_VALUE));

assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, buffer::getShortVolatile);
assertUnaligned(addressOffset, offset + SIZE_OF_BYTE, (i) -> buffer.putShortVolatile(i, Short.MAX_VALUE));
Expand Down
12 changes: 12 additions & 0 deletions agrona-concurrency-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
To run the jcstress concurrency tests directly without Gradle, do the following:

```
./gradlew ./gradlew :agrona-concurrency-tests:shadowJar
```
The above will build the appropriate JCStress binary.

And then:
```
java -jar agrona-concurrency-tests/build/libs/concurrency-tests.jar YourTest -jvmArgsPrepend "--add-exports java.base/jdk.internal.misc=ALL-UNNAMED"
```
Make sure to replace the `YourTest` by the appropriate JCStress test.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.III_Result;
import org.openjdk.jcstress.infra.results.I_Result;
import org.openjdk.jcstress.infra.results.JJJ_Result;
import org.openjdk.jcstress.infra.results.J_Result;
import org.openjdk.jcstress.infra.results.*;

import static java.nio.ByteBuffer.allocateDirect;

Expand All @@ -38,14 +35,97 @@ public class UnsafeBufferTests
{
}

/**
* Test that verifies that the {@link AtomicBuffer#putLongRelease(int, long)} and
* {@link AtomicBuffer#getLongAcquire(int)} provide causal consistency.
* <p>
* For causal consistency and 2 variables A and B, if first A=1 and then B=1,
* and another thread reads B=1, then it should also read A=1.
*/
@JCStressTest
// first value is B, second value is A.
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE)
@Outcome(id = "0, 0", expect = Expect.ACCEPTABLE)
@Outcome(id = "1, 1", expect = Expect.ACCEPTABLE)
@Outcome(id = "1, 0", expect = Expect.FORBIDDEN)
@State
public static class ReleaseAcquireLong
{
private static final int A = 0;
private static final int B = 8;
private final UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(16, 8));

/**
* Release thread.
*/
@Actor
public void release()
{
buffer.putLong(A, 1);
buffer.putLongRelease(B, 1);
}

/**
* Acquire thread.
*
* @param result object.
*/
@Actor
public void acquire(final JJ_Result result)
{
result.r1 = buffer.getLongAcquire(B);
result.r2 = buffer.getLong(A);
}
}

/**
* Test that verifies that the {@link AtomicBuffer#putIntRelease(int, int)} and
* {@link AtomicBuffer#getIntAcquire(int)} provide causal consistency. For more
* info see {@link ReleaseAcquireLong}.
*/
@JCStressTest
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE)
@Outcome(id = "0, 0", expect = Expect.ACCEPTABLE)
@Outcome(id = "1, 1", expect = Expect.ACCEPTABLE)
@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE)
@State
public static class ReleaseAcquireInt
{
private static final int A = 0;
private static final int B = 4;
private final UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(8, 4));

/**
* Writer thread.
*/
@Actor
public void release()
{
buffer.putInt(A, 1);
buffer.putIntRelease(B, 1);
}

/**
* Reader thread.
*
* @param result object.
*/
@Actor
public void acquire(final JJ_Result result)
{
result.r1 = buffer.getIntAcquire(B);
result.r2 = buffer.getInt(A);
}
}

/**
* Test that verifies the atomicity of the {@link UnsafeBuffer#putLongVolatile(int, long)},
* {@link UnsafeBuffer#putLongOrdered(int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}.
* {@link UnsafeBuffer#putLongRelease(int, long)} (int, long)} and {@link UnsafeBuffer#getLongVolatile(int)}.
*/
@JCStressTest
@Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes")
@Outcome(id = "-1", expect = Expect.ACCEPTABLE, desc = "putLongVolatile before read")
@Outcome(id = "9223372036854775806", expect = Expect.ACCEPTABLE, desc = "putLongOrdered before read")
@Outcome(id = "9223372036854775806", expect = Expect.ACCEPTABLE, desc = "putLongRelease before read")
@State
public static class DirectBufferLong
{
Expand All @@ -69,9 +149,9 @@ public void putLongVolatile()
* Writer thread.
*/
@Actor
public void putLongOrdered()
public void putLongRelease()
{
buffer.putLongOrdered(WRITE_INDEX, Long.MAX_VALUE - 1);
buffer.putLongRelease(WRITE_INDEX, Long.MAX_VALUE - 1);
}

/**
Expand All @@ -88,7 +168,7 @@ public void actor2(final J_Result result)

/**
* Test that verifies the atomicity of the {@link UnsafeBuffer#putIntVolatile(int, int)},
* {@link UnsafeBuffer#putIntOrdered(int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}.
* {@link UnsafeBuffer#putIntRelease(int, int)} (int, int)} and {@link UnsafeBuffer#getIntVolatile(int)}.
*/
@JCStressTest
@Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "read before writes")
Expand Down Expand Up @@ -117,9 +197,9 @@ public void putIntVolatile()
* Writer thread.
*/
@Actor
public void putIntOrdered()
public void putIntRelease()
{
buffer.putIntOrdered(WRITE_INDEX, 222222222);
buffer.putIntRelease(WRITE_INDEX, 222222222);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions agrona/src/main/java/org/agrona/MarkFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void close()
*/
public void signalReady(final int version)
{
buffer.putIntOrdered(versionFieldOffset, version);
buffer.putIntRelease(versionFieldOffset, version);
}

/**
Expand Down Expand Up @@ -289,7 +289,7 @@ public int versionWeak()
*/
public void timestampOrdered(final long timestamp)
{
buffer.putLongOrdered(timestampFieldOffset, timestamp);
buffer.putLongRelease(timestampFieldOffset, timestamp);
}

/**
Expand Down
85 changes: 80 additions & 5 deletions agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@

/**
* Abstraction over a range of buffer types that allows type to be accessed with various memory ordering semantics.
* <p>
* Before Java 9, there was no naming standard for stores with release semantics. On the AtomicLong there was the
* {@link java.util.concurrent.atomic.AtomicLong#lazySet(long)}. Because there was no standard, the AtomicBuffer
* has methods like {@link #putLongOrdered(int, long)}. With Java 9, the 'release' name has been introduced.
* The AtomicBuffer also has methods with release methods which are identical to the ordered methods. All the
* methods with 'ordered' name will call the equivalent method with release name. This introduces a small
* performance penalty for the older methods and this should encourage users to switch to the newer methods.
*/
public interface AtomicBuffer extends MutableDirectBuffer
{
Expand Down Expand Up @@ -53,8 +60,8 @@ public interface AtomicBuffer extends MutableDirectBuffer
* Verify that the underlying buffer is correctly aligned to prevent word tearing, other ordering issues and
* the JVM crashes. In particular this method verifies that the starting offset of the underlying buffer is properly
* aligned. However, the actual atomic call must ensure that the index is properly aligned, i.e. it must be aligned
* to the size of the operand. For example a call to any of the following methods {@link #putIntOrdered(int, int)},
* {@link #putIntVolatile(int, int)}, {@link #addIntOrdered(int, int)}, {@link #getIntVolatile(int)},
* to the size of the operand. For example a call to any of the following methods {@link #putIntRelease(int, int)},
* {@link #putIntVolatile(int, int)}, {@link #addIntRelease(int, int)} (int, int)}, {@link #getIntVolatile(int)},
* {@link #getAndAddInt(int, int)} or {@link #getAndSetInt(int, int)}, must have the index aligned by four bytes
* (e.g. {@code 0, 4, 8, 12, 60 etc.}).
* <p>
Expand Down Expand Up @@ -88,6 +95,15 @@ public interface AtomicBuffer extends MutableDirectBuffer
*/
long getLongVolatile(int index);

/**
* Atomically get the value at a given index with acquire semantics.
*
* @param index in bytes from which to get.
* @return the value for at a given index.
* @since 2.1.0
*/
long getLongAcquire(int index);

/**
* Atomically put a value to a given index with volatile semantics.
* <p>
Expand All @@ -101,24 +117,49 @@ public interface AtomicBuffer extends MutableDirectBuffer
/**
* Atomically put a value to a given index with ordered store semantics.
* <p>
* This call has release semantics.
* Instead of using this method, use {@link #putLongRelease(int, long)} instead. They
* are identical and the putLongRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putLongOrdered(int index, long value);

/**
* Atomically put a value to a given index with release semantics.
*
* @param index in bytes for where to put.
* @param value for at a given index.
* @since 2.1.0
*/
void putLongRelease(int index, long value);

/**
* Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
* <p>
* Instead of using this method, use {@link #addLongRelease(int, long)} instead. They
* are identical but the addLongRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
long addLongOrdered(int index, long increment);

/**
* Atomically adds a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
* @since 2.1.0
*/
long addLongRelease(int index, long increment);

/**
* Atomic compare and set of a long given an expected value.
* <p>
Expand Down Expand Up @@ -174,27 +215,61 @@ public interface AtomicBuffer extends MutableDirectBuffer
*/
void putIntVolatile(int index, int value);

/**
* Atomically get the value at a given index with acquire semantics.
*
* @param index in bytes from which to get.
* @return the value for at a given index.
* @since 2.1.0
*/
int getIntAcquire(int index);

/**
* Atomically put a value to a given index with ordered semantics.
* <p>
* This call has release semantics.
*
* Instead of using this method, use {@link #putIntRelease} instead. They
* are identical but the putIntRelease is the preferred version.

* @param index in bytes for where to put.
* @param value for at a given index.
*/
void putIntOrdered(int index, int value);

/**
* Atomically put a value to a given index with release semantics.
*
* @param index in bytes for where to put.
* @param value for at a given index.
* @since 2.1.0
*/
void putIntRelease(int index, int value);

/**
* Atomically add a value to a given index with ordered store semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
* <p>
* Instead of using this method, use {@link #addIntRelease(int, int)} instead. They
* are identical but the addIntRelease is the preferred version.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
*/
int addIntOrdered(int index, int increment);

/**
* Atomically add a value to a given index with release semantics. Use a negative increment to decrement.
* <p>
* The load has no ordering semantics. The store has release semantics.
*
* @param index in bytes for where to put.
* @param increment by which the value at the index will be adjusted.
* @return the previous value at the index.
* @since 2.1.0
*/
int addIntRelease(int index, int increment);

/**
* Atomic compare and set of an int given an expected value.
* <p>
Expand Down
Loading