Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic read and write many #4

Open
jberryman opened this issue Jul 12, 2014 · 6 comments
Open

Atomic read and write many #4

jberryman opened this issue Jul 12, 2014 · 6 comments

Comments

@jberryman
Copy link
Owner

It would be straight-forward to add a function like:

readManyChan :: Int -> OutChan a -> IO [a]

that atomically reads n consecutive messages, blocking until all are present. This would be both faster than a replicateM_ n (readChan c) and might be useful if you need to interleave readers that need to take consecutive chunks of n elements for whatever reason.

Please comment if you need this.

@jfischoff
Copy link
Contributor

This would be useful for me, assuming it is faster.

@twittner
Copy link

I would be interested in something similar. Rather than blocking until all n items are present I would like to see a variant which reads all available items up to some limit n and only blocks if no item is available. Also I do not need preservation of insertion order with concurrent consumers.

@jberryman
Copy link
Owner Author

Thanks for the input! Hmmm I'm not sure what you want is possible with the
current implementation. Basically readChanN n would need to return with
at least an obligation for the next n consecutive elements. Would it
work for you to have a function that returns some sort of IO action that
returns the elements of that n as they become available as you describe?
On Nov 20, 2014 1:06 PM, "Toralf Wittner" [email protected] wrote:

I would be interested in something similar. Rather than blocking until all
n items are present I would like to see a variant which reads all
available items up to some limit n and only blocks if no item is
available. Also I do not need preservation of insertion order with
concurrent consumers.


Reply to this email directly or view it on GitHub
#4 (comment).

@jberryman jberryman changed the title Atomic read many Atomic read and write many Apr 15, 2015
@jberryman
Copy link
Owner Author

@twittner this may be relevant to your needs, but shortly I'm going to release a change to Element such that it allows both a non-blocking and a blocking read. This is to support a use case like:

  • take as many elements as are available, without blocking
    • you'll be left with the last Element which returned a Nothing on tryRead
  • batch them and do something
  • continue working from that one Element that was pending, by doing a possibly-blocking read

Without that capability we're stuck needing to do a busy-wait.

@jberryman
Copy link
Owner Author

I think the interface for batched writes will be polymorphic, and look like:

writeChanMany :: Batch a=> InChan -> a -> IO ()

where batch instances will be tuples of different sizes (,), (,,) etc., and arrays. This makes a number of hairy details go away.

The readChanMany will probably return a mock array, which is really just:

newtype Array a = Array { indexArray :: Int -> IO a }

That would just hold on to one or sever array segments, but should allow later segments to be GC'd just as we'd like.

@jberryman
Copy link
Owner Author

A simpler, and maybe more useful function is flushChan (name after TQueue) since we never need to block. Machinery might only be in place in Bounded since we need to read the write-end counter. We would use a CAS loop on the read-end count to avoid claiming too many during a race (would this need to timeout? else livelock is possible).

What form to return data? A stream/array like a ByteString.Lazy for vector would be ideal, as we could return it without copying, but I don't think that exists unfortunately

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants