Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Manager #18

Open
abersnaze opened this issue Sep 4, 2016 · 6 comments
Open

Resource Manager #18

abersnaze opened this issue Sep 4, 2016 · 6 comments

Comments

@abersnaze
Copy link

I have this bit of code that I've used a couple of times and I figured it could use a good home. Its a class for currying the resource creation and clean of the rx.Observable#using function. Would this be a good fit for this project?

import rx.Observable;
import rx.functions.Func1;

public class ResourceManager<T> {
    public static interface CheckedFunc0<R> {
        public R call() throws Exception;
    }

    public static interface CheckedAction1<T> {
        public void call(T t) throws Exception;
    }

    private CheckedFunc0<T> resourceFactory;
    private CheckedAction1<? super T> disposeAction;

    public ResourceManager(final CheckedFunc0<T> resourceFactory, final CheckedAction1<? super T> disposeAction) {
        this.resourceFactory = resourceFactory;
        this.disposeAction = disposeAction;
    }

    /**
     * The resource T is available for use by the function passed in until the {@link Observable} returned is unsubscribed from.
     * 
     * @param func
     * @return
     */
    public final <R> Observable<R> checkout(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return Observable.using(() -> {
            try {
                return resourceFactory.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } , func, (rsrc) -> {
            try {
                disposeAction.call(rsrc);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
@davidmoten
Copy link
Owner

davidmoten commented Sep 4, 2016

Hi George
Happy to add that. So this is an example usage:

Observable<byte[]> bytes = new ResourceManager(
        () -> new FileInputStream(file), 
        is -> is.close())
    .checkout(is ->  bytesFrom(is));

Just for comparison, I normally use:

Observable<byte> bytes = 
    Observable.using(
        Checked.f0(() -> new FileInputStream(file)),
        is -> bytesFrom(is), 
        Checked.a1(is -> is.close()));

I can certainly imagine the currying aspect of your offering being useful.

Now that we are chatting about it another overload of Observable.using in RxJava 1 and 2 might be useful for resources that implement Closeable ( I frequently use using with InputStreams and Sockets). Do you think that might be worth it? By the same token your ResourceManager could do with another overloaded constructor to make usage briefer.

@davidmoten
Copy link
Owner

@abersnaze Do you want to make a PR or would you like me to pop it in?

@abersnaze
Copy link
Author

With the direct comparison it's hard to see but by separating the management from the usage it makes the user code simpler.

Observable<R> doStuff(ResourceManage<File> tmpFileGenerator) {
    return tmpFileGenerator.checkout(tmpFile -> {
        // do stuff with a temporary file that gets automatically cleaned up when done.
        return ...;
    });
}

I think this API could have a few more options to have resource pooling and fallbacks if resources are not available.

I can submit a PR too.

@sirinath
Copy link

I am looking for a solution to close / dispose a set of observable resources, i.e., to call the close method on each item but only when all users are over to free the resources. What I mean all users are over is it cannot be passed to any OnNext even downstream or part of Windowing or Grouping or any other downstream subject or subscription, i.e., no longer used observable or operator sequence.

@davidmoten
Copy link
Owner

Sounds like you need to look at

Observable.using (...).share();

On Tue, 20 Sep 2016, 19:52 Suminda Dharmasena [email protected]
wrote:

I am looking for a solution to close / dispose a set of observable
resources, i.e., to call the close method on each item but only when all
users are over to free the resources. What I mean all users are over is it
cannot be passed to any OnNext even downstream or part of Windowing or
Grouping or any other downstream subject or subscription, i.e., no longer
used observable or operator sequence.


You are receiving this because you commented.

Reply to this email directly, view it on GitHub
#18 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AATa68_wcHRQgQiGcL4p8l09ueWoVnM_ks5qr6z0gaJpZM4J0fBn
.

@sirinath
Copy link

I am looking for RefCounting of the contained objects. E.g. I have a set of objects which allocate native resources.

This is not exact what I am using but help you get a sense of it through my concern is not a file like in this example:

Subject<File> s = ...

s.OnNext(thefile1)
s.OnNext(thefile2)
...
s.OnNext(thefileN)

Once you are done with thefileX I want to close it.

The above use case is if you have one resource from which you are getting a stream of observables not necessarily dealing with a stream of resources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants