Skip to content

Commit

Permalink
Fixed scheduler leak and lack of backpressure support.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jun 5, 2015
1 parent 7cc5b11 commit 79a35a5
Showing 1 changed file with 15 additions and 26 deletions.
41 changes: 15 additions & 26 deletions src/main/java/rx/observable/ListenableFutureObservable.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observable;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executor;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Action0;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.functions.Action0;
import rx.internal.operators.SingleDelayedProducer;

import java.util.concurrent.Executor;
import com.google.common.util.concurrent.*;

public class ListenableFutureObservable {

Expand All @@ -45,7 +27,11 @@ public void execute(final Runnable command) {
worker.schedule(new Action0() {
@Override
public void call() {
command.run();
try {
command.run();
} finally {
worker.unsubscribe();
}
}
});
}
Expand All @@ -63,18 +49,21 @@ public static <T> Observable<T> from(final ListenableFuture<T> future, final Exe
return Observable.create(new OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
final SingleDelayedProducer<T> sdp = new SingleDelayedProducer<T>(subscriber);
subscriber.setProducer(sdp);

future.addListener(new Runnable() {
@Override
public void run() {
try {
T t = future.get();
subscriber.onNext(t);
subscriber.onCompleted();
sdp.set(t);
} catch (Exception e) {
subscriber.onError(e);
}
}
}, executor);

}
});
}
Expand Down

0 comments on commit 79a35a5

Please sign in to comment.