Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provides clarification on the 1.9 rule #494

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public interface Publisher<T> {
| [:bulb:](#1.7 "1.7 explained") | *The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.* |
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for **eventually** is because signals can have propagation delay due to being asynchronous.* |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` [synchronously](#term_sync) on the provided `Subscriber`, and do so prior to any other signals to that `Subscriber`, and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals and is called synchronusly with in the `subscribe` method, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| [:bulb:](#1.10 "1.10 explained") | *The intent of this rule is to have callers of `subscribe` be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics of `subscribe` must be upheld no matter how many times it is called.* |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.Iterator;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* AsyncIterablePublisher is an implementation of Reactive Streams `Publisher`
Expand Down Expand Up @@ -59,7 +59,6 @@ public void subscribe(final Subscriber<? super T> s) {
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls
static interface Signal {};
enum Cancel implements Signal { Instance; };
enum Subscribe implements Signal { Instance; };
enum Send implements Signal { Instance; };
static final class Request implements Signal {
final long n;
Expand Down Expand Up @@ -87,7 +86,7 @@ final class SubscriptionImpl implements Subscription, Runnable {

// We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself,
// which would violate rule 1.3 among others (no concurrent notifications).
private final AtomicBoolean on = new AtomicBoolean(false);
private final AtomicInteger wip = new AtomicInteger(0);

// This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17
private void doRequest(final long n) {
Expand Down Expand Up @@ -205,46 +204,47 @@ private void signal(final Signal signal) {

// This is the main "event loop" if you so will
@Override public final void run() {
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
int remainingWork = 1;
for (;;) {

// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof Request)
doRequest(((Request)s).n);
else if (s == Send.Instance)
doSend();
else if (s == Cancel.Instance)
doCancel();
else if (s == Subscribe.Instance)
doSubscribe();
Signal s;
while ((s = inboundSignals.poll()) != null) {
if (cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
return;
}
} finally {
on.set(false); // establishes a happens-before relationship with the beginning of the next run
if(!inboundSignals.isEmpty()) // If we still have signals to process
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again

// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof Request)
doRequest(((Request) s).n);
else if (s == Send.Instance)
doSend();
else if (s == Cancel.Instance)
doCancel();
}

remainingWork = wip.addAndGet(-remainingWork); // establishes a happens-before relationship with the beginning of the next run
if (remainingWork == 0) {
return;
}
}
}

// This method makes sure that this `Subscription` is only running on one Thread at a time,
// this is important to make sure that we follow rule 1.3
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
if (!cancelled) {
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear(); // We're not going to need these anymore
// This subscription is cancelled by now, but letting it become schedulable again means
// that we can drain the inboundSignals queue if anything arrives after clearing
on.set(false);
}
if (wip.getAndIncrement() != 0) { // ensure happens-before with already running work
return;
}

try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
if (!cancelled) {
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear(); // We're not going to need these anymore
}
}
}
Expand All @@ -263,7 +263,7 @@ private final void tryScheduleToExecute() {
// method is only intended to be invoked once, and immediately after the constructor has
// finished.
void init() {
signal(Subscribe.Instance);
doSubscribe();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,11 @@ public void onSubscribe(Subscription s) {
concurrentAccessBarrier.enterSignal(signal);

subs = s;
subs.request(1);

concurrentAccessBarrier.leaveSignal(signal);

//request after leave signal since request may be offloaded so it is going to be false positive racing
subs.request(1);
}

@Override
Expand Down Expand Up @@ -1099,7 +1101,7 @@ public void onNext(T element) {
}
}
};
env.subscribe(pub, sub, env.defaultTimeoutMillis());
env.subscribe(pub, sub);

// eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
// we're pretty sure to overflow from those
Expand Down
14 changes: 3 additions & 11 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,28 +297,20 @@ public <T> T flopAndFail(String msg) {


public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
subscribe(pub, sub, defaultTimeoutMillis);
}

public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
pub.subscribe(sub);
sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
sub.subscription.expectCompletion(0, String.format("Could not subscribe %s to Publisher %s", sub, pub));
verifyNoAsyncErrorsNoDelay();
}

public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
subscribe(pub, sub, defaultTimeoutMillis());
subscribe(pub, sub);
return sub;
}

public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
return newManualSubscriber(pub, defaultTimeoutMillis());
}

public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
subscribe(pub, sub, timeoutMillis);
subscribe(pub, sub);
return sub;
}

Expand Down