You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
With many producers and a blocking wait strategy, SequenceBarrier.waitForSequence(nextSequence) can return a value less than its input. Specifically, it can return nextSequence - 1.
To Reproduce
Run thisFails(). It calls testWithNumProducers, which fails if waitForSequence(nextSequence) returns a value less than its input.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
class WaitForReproTest {
@Test
void thisFails() throws InterruptedException {
testWithNumProducers(128);
}
@Test
void thisWorks() throws InterruptedException {
testWithNumProducers(3);
}
void testWithNumProducers(int numProducers) throws InterruptedException {
WaitForRepro w = new WaitForRepro();
w.startConsumer();
w.startProducers(numProducers);
Thread.sleep(4000);
Assertions.assertFalse(w.failed());
}
static class WaitForRepro {
private final RingBuffer<Integer> buffer;
private final SequenceBarrier barrier;
private volatile boolean failed;
public WaitForRepro() {
buffer = RingBuffer.createMultiProducer(() -> 2, 16, new BlockingWaitStrategy());
barrier = buffer.newBarrier();
}
public void startConsumer() {
Executors.newSingleThreadExecutor().submit(() -> {
long nextSequence = 0;
long availableSequence;
while (true) {
availableSequence = barrier.waitFor(nextSequence);
if (nextSequence > availableSequence) {
fail();
}
while (nextSequence <= availableSequence) {
buffer.get(nextSequence++);
}
Thread.sleep(5);
}
});
}
public void startProducers(int numProducers) {
ExecutorService executor = Executors.newFixedThreadPool(numProducers);
for (int i = 0; i < numProducers; ++i) {
executor.submit(() -> {
while (true) {
buffer.publishEvent((event, seq) -> {
// NOP
});
Thread.sleep(3);
}
});
}
}
private void fail() {
failed = true;
}
public boolean failed() {
return failed;
}
}
}
Expected behavior
I expected that with a BlockingWaitStrategy, waitFor(nextSequence) should block until it can return nextSequence or a value greater than it.
Desktop (please complete the following information):
OS: MacOS
Version 3.4.4 and, 4.0.0
JVM Version OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7)
Additional context
I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.
Describe the bug
With many producers and a blocking wait strategy,
SequenceBarrier.waitForSequence(nextSequence)
can return a value less than its input. Specifically, it can returnnextSequence - 1
.To Reproduce
Run
thisFails()
. It callstestWithNumProducers
, which fails ifwaitForSequence(nextSequence)
returns a value less than its input.Expected behavior
I expected that with a BlockingWaitStrategy,
waitFor(nextSequence)
should block until it can returnnextSequence
or a value greater than it.Desktop (please complete the following information):
Additional context
I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.
In 2.10.4, I did not observe this behavior.
When debugging, I found that the blocking wait strategy returns the a sequence greater than or equal to sequence on line 56 of waitFor. But on line 63, sequencer.getHighestPublishedSequence returns sequence - 1.
https://github.com/LMAX-Exchange/disruptor/blob/3.4.4/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java#L51
In 2.10.4, waitFor simply defers to the wait strategy, so sequence - 1 can never be returned with a blocking wait strategy.
https://github.com/LMAX-Exchange/disruptor/blob/2.10.4/code/src/main/com/lmax/disruptor/ProcessingSequenceBarrier.java#L40
The text was updated successfully, but these errors were encountered: