Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multithreading and progress reporting together #73

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions include/vg/io/blocked_gzip_input_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class BlockedGzipInputStream : public ::google::protobuf::io::ZeroCopyInputStrea
/// Make a new stream reading from the given C++ std::istream, wrapping it
/// in a BGZF. The stream must be at a BGZF block header, since the header
/// info is peeked.
BlockedGzipInputStream(std::istream& stream);
///
/// If thread_count is more than 1, enables multi-threaded BGZF decoding.
/// This needs to be part of the comstructor to ensure that it happens
/// before any data is read through the decoder.
BlockedGzipInputStream(std::istream& stream, size_t thread_count = 0);

/// Destroy the stream.
virtual ~BlockedGzipInputStream();
Expand Down Expand Up @@ -81,10 +85,6 @@ class BlockedGzipInputStream : public ::google::protobuf::io::ZeroCopyInputStrea
/// are operating on a non-blocked GZIP or uncompressed file.
virtual bool IsBGZF() const;

/// Turn on multithreaded decompression. Return true if successful and
/// false if the BGZF could not set up its thread pool.
virtual bool EnableMultiThreading(size_t thread_count);

/// Return true if the given istream looks like GZIP-compressed data (i.e.
/// has the GZIP magic number as its first two bytes). Replicates some of
/// the sniffing logic that htslib does, but puts back the sniffed
Expand Down
3 changes: 3 additions & 0 deletions include/vg/io/message_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class MessageIterator {
static string sniff_tag(::google::protobuf::io::ZeroCopyInputStream& stream);

/// Constructor to wrap a stream.
/// If thread_count is more than 1, enables multi-threaded BGZF decoding,
/// in which case no method on the stream may be called by anyone else
/// until the MessageIterator is destroyed!
MessageIterator(istream& in, bool verbose = false, size_t thread_count = 0);

/// Constructor to wrap an existing BGZF
Expand Down
5 changes: 4 additions & 1 deletion include/vg/io/protobuf_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ using namespace std;
template <typename T>
class ProtobufIterator {
public:
/// Constructor
/// Constructor. Uses single-threaded decoding, so methods may be called on
/// the given stream from other code while the ProtobufIterator exists, as
/// long as no ProtobufIterator method is running.
ProtobufIterator(istream& in);

///////////
Expand Down Expand Up @@ -117,6 +119,7 @@ class ProtobufIterator {
private:

/// Wrap a MessageIterator and just do Protobuf parsing on top of that.
/// We always use one that is single-threaded.
MessageIterator message_it;

/// We always maintain a parsed version of the current message.
Expand Down
22 changes: 19 additions & 3 deletions include/vg/io/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ void for_each(std::istream& in,
lambda(it.tell_group(), *it);

if (stream_length != std::numeric_limits<size_t>::max()) {
// Do progress
// Do progress.
// We know ProtobufIterator uses single-threaded decompression, so
// we can act on the stream directly.
progress(get_stream_position(in), stream_length);
}
}
Expand Down Expand Up @@ -193,6 +195,12 @@ void for_each_parallel_impl(std::istream& in,
// We do our own multi-threaded Protobuf decoding, but we batch up our
// strings by pulling them from this iterator, which we also
// multi-thread for decompression.
//
// Note that as long as this exists, we **may not** use the "in"
// stream! Even just to tell() it for the current position! This will
// start backgorund threads that use the stream, and on mac at least
// even a tellg() can mutate the stream internally and cause the
// background threads to segfault.
MessageIterator message_it(in, false, 8);

std::vector<std::string> *batch = nullptr;
Expand Down Expand Up @@ -305,8 +313,16 @@ void for_each_parallel_impl(std::istream& in,
}

if (stream_length != std::numeric_limits<size_t>::max()) {
// Do progress
progress(get_stream_position(in), stream_length);
// Do progress. But we can't use get_stream_position because we
// can't use the stream!
//
// We also can't get at htslib's bgzf_htell, which synchronizes
// with the real read threads but isn't exposed as a symbol.
//
// So we get the virtual offset and shift off the
// non-block-address bits so it is a real backing file offset,
// just with BGZF block resolution.
progress(message_it.tell_group()>>16, stream_length);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/blocked_gzip_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace io {

using namespace std;

BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream) : handle(nullptr), byte_count(0),
BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream, size_t thread_count) : handle(nullptr), byte_count(0),
know_offset(false) {

// See where the stream is
Expand All @@ -30,6 +30,10 @@ BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream) : handle(nu
if (handle == nullptr) {
throw runtime_error("Unable to set up BGZF library on wrapped stream");
}

if (thread_count > 1 && bgzf_mt(handle, thread_count, 256) != 0) {
throw runtime_error("Unable to set up BGZF multi-threading");
}

if (file_start >= 0 && good && (bgzf_compression(handle) == 2 || bgzf_compression(handle) == 0)) {
// The stream we are wrapping is seekable, and the data is block-compressed or uncompressed
Expand Down Expand Up @@ -261,10 +265,6 @@ bool BlockedGzipInputStream::IsBGZF() const {
return handle->is_compressed && !handle->is_gzip;
}

bool BlockedGzipInputStream::EnableMultiThreading(size_t thread_count) {
return bgzf_mt(handle, thread_count, 256) == 0;
}

bool BlockedGzipInputStream::SmellsLikeGzip(std::istream& in) {
// TODO: We also assume that we can sniff the magic number bytes
// from the input stream and then put them both back. The C spec
Expand Down
9 changes: 2 additions & 7 deletions src/message_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,8 @@ string MessageIterator::sniff_tag(::google::protobuf::io::ZeroCopyInputStream& s
return tag;
}

MessageIterator::MessageIterator(istream& in, bool verbose, size_t thread_count) : MessageIterator(unique_ptr<BlockedGzipInputStream>(new BlockedGzipInputStream(in)), verbose) {
if (thread_count > 1) {
// After making the BGZF, turn on multithreaded decoding
if (!bgzip_in->EnableMultiThreading(thread_count)) {
throw std::runtime_error("Cound not enable multithreaded BGZF decoding");
}
}
MessageIterator::MessageIterator(istream& in, bool verbose, size_t thread_count) : MessageIterator(unique_ptr<BlockedGzipInputStream>(new BlockedGzipInputStream(in, thread_count)), verbose) {
// Nothing to do!
}

MessageIterator::MessageIterator(unique_ptr<BlockedGzipInputStream>&& bgzf, bool verbose) :
Expand Down