Skip to content

Commit

Permalink
Merge pull request #14 from abersnaze/FixConversions
Browse files Browse the repository at this point in the history
Fixed scheduler leak and lack of backpressure support
  • Loading branch information
abersnaze committed Jun 5, 2015
2 parents 7cc5b11 + ad5ad00 commit 0a2f070
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/main/java/rx/observable/ListenableFutureObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.functions.Action0;
import rx.internal.operators.SingleDelayedProducer;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Worker;
Expand All @@ -45,7 +46,11 @@ public void execute(final Runnable command) {
worker.schedule(new Action0() {
@Override
public void call() {
command.run();
try {
command.run();
} finally {
worker.unsubscribe();
}
}
});
}
Expand All @@ -63,18 +68,21 @@ public static <T> Observable<T> from(final ListenableFuture<T> future, final Exe
return Observable.create(new OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
final SingleDelayedProducer<T> sdp = new SingleDelayedProducer<T>(subscriber);
subscriber.setProducer(sdp);

future.addListener(new Runnable() {
@Override
public void run() {
try {
T t = future.get();
subscriber.onNext(t);
subscriber.onCompleted();
sdp.set(t);
} catch (Exception e) {
subscriber.onError(e);
}
}
}, executor);

}
});
}
Expand Down

0 comments on commit 0a2f070

Please sign in to comment.