diff --git a/include/vg/io/blocked_gzip_input_stream.hpp b/include/vg/io/blocked_gzip_input_stream.hpp index d75182f..e56756c 100644 --- a/include/vg/io/blocked_gzip_input_stream.hpp +++ b/include/vg/io/blocked_gzip_input_stream.hpp @@ -24,7 +24,11 @@ class BlockedGzipInputStream : public ::google::protobuf::io::ZeroCopyInputStrea /// Make a new stream reading from the given C++ std::istream, wrapping it /// in a BGZF. The stream must be at a BGZF block header, since the header /// info is peeked. - BlockedGzipInputStream(std::istream& stream); + /// + /// If thread_count is more than 1, enables multi-threaded BGZF decoding. + /// This needs to be part of the comstructor to ensure that it happens + /// before any data is read through the decoder. + BlockedGzipInputStream(std::istream& stream, size_t thread_count = 0); /// Destroy the stream. virtual ~BlockedGzipInputStream(); @@ -81,10 +85,6 @@ class BlockedGzipInputStream : public ::google::protobuf::io::ZeroCopyInputStrea /// are operating on a non-blocked GZIP or uncompressed file. virtual bool IsBGZF() const; - /// Turn on multithreaded decompression. Return true if successful and - /// false if the BGZF could not set up its thread pool. - virtual bool EnableMultiThreading(size_t thread_count); - /// Return true if the given istream looks like GZIP-compressed data (i.e. /// has the GZIP magic number as its first two bytes). Replicates some of /// the sniffing logic that htslib does, but puts back the sniffed diff --git a/include/vg/io/message_iterator.hpp b/include/vg/io/message_iterator.hpp index d086434..0827b74 100644 --- a/include/vg/io/message_iterator.hpp +++ b/include/vg/io/message_iterator.hpp @@ -72,6 +72,9 @@ class MessageIterator { static string sniff_tag(::google::protobuf::io::ZeroCopyInputStream& stream); /// Constructor to wrap a stream. + /// If thread_count is more than 1, enables multi-threaded BGZF decoding, + /// in which case no method on the stream may be called by anyone else + /// until the MessageIterator is destroyed! MessageIterator(istream& in, bool verbose = false, size_t thread_count = 0); /// Constructor to wrap an existing BGZF diff --git a/include/vg/io/protobuf_iterator.hpp b/include/vg/io/protobuf_iterator.hpp index 349883b..68a8a86 100644 --- a/include/vg/io/protobuf_iterator.hpp +++ b/include/vg/io/protobuf_iterator.hpp @@ -36,7 +36,9 @@ using namespace std; template class ProtobufIterator { public: - /// Constructor + /// Constructor. Uses single-threaded decoding, so methods may be called on + /// the given stream from other code while the ProtobufIterator exists, as + /// long as no ProtobufIterator method is running. ProtobufIterator(istream& in); /////////// @@ -117,6 +119,7 @@ class ProtobufIterator { private: /// Wrap a MessageIterator and just do Protobuf parsing on top of that. + /// We always use one that is single-threaded. MessageIterator message_it; /// We always maintain a parsed version of the current message. diff --git a/include/vg/io/stream.hpp b/include/vg/io/stream.hpp index bcefc2c..b4eb422 100644 --- a/include/vg/io/stream.hpp +++ b/include/vg/io/stream.hpp @@ -128,7 +128,9 @@ void for_each(std::istream& in, lambda(it.tell_group(), *it); if (stream_length != std::numeric_limits::max()) { - // Do progress + // Do progress. + // We know ProtobufIterator uses single-threaded decompression, so + // we can act on the stream directly. progress(get_stream_position(in), stream_length); } } @@ -193,6 +195,12 @@ void for_each_parallel_impl(std::istream& in, // We do our own multi-threaded Protobuf decoding, but we batch up our // strings by pulling them from this iterator, which we also // multi-thread for decompression. + // + // Note that as long as this exists, we **may not** use the "in" + // stream! Even just to tell() it for the current position! This will + // start backgorund threads that use the stream, and on mac at least + // even a tellg() can mutate the stream internally and cause the + // background threads to segfault. MessageIterator message_it(in, false, 8); std::vector *batch = nullptr; @@ -305,8 +313,16 @@ void for_each_parallel_impl(std::istream& in, } if (stream_length != std::numeric_limits::max()) { - // Do progress - progress(get_stream_position(in), stream_length); + // Do progress. But we can't use get_stream_position because we + // can't use the stream! + // + // We also can't get at htslib's bgzf_htell, which synchronizes + // with the real read threads but isn't exposed as a symbol. + // + // So we get the virtual offset and shift off the + // non-block-address bits so it is a real backing file offset, + // just with BGZF block resolution. + progress(message_it.tell_group()>>16, stream_length); } } diff --git a/src/blocked_gzip_input_stream.cpp b/src/blocked_gzip_input_stream.cpp index e6a81c0..bf26914 100644 --- a/src/blocked_gzip_input_stream.cpp +++ b/src/blocked_gzip_input_stream.cpp @@ -11,7 +11,7 @@ namespace io { using namespace std; -BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream) : handle(nullptr), byte_count(0), +BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream, size_t thread_count) : handle(nullptr), byte_count(0), know_offset(false) { // See where the stream is @@ -30,6 +30,10 @@ BlockedGzipInputStream::BlockedGzipInputStream(std::istream& stream) : handle(nu if (handle == nullptr) { throw runtime_error("Unable to set up BGZF library on wrapped stream"); } + + if (thread_count > 1 && bgzf_mt(handle, thread_count, 256) != 0) { + throw runtime_error("Unable to set up BGZF multi-threading"); + } if (file_start >= 0 && good && (bgzf_compression(handle) == 2 || bgzf_compression(handle) == 0)) { // The stream we are wrapping is seekable, and the data is block-compressed or uncompressed @@ -261,10 +265,6 @@ bool BlockedGzipInputStream::IsBGZF() const { return handle->is_compressed && !handle->is_gzip; } -bool BlockedGzipInputStream::EnableMultiThreading(size_t thread_count) { - return bgzf_mt(handle, thread_count, 256) == 0; -} - bool BlockedGzipInputStream::SmellsLikeGzip(std::istream& in) { // TODO: We also assume that we can sniff the magic number bytes // from the input stream and then put them both back. The C spec diff --git a/src/message_iterator.cpp b/src/message_iterator.cpp index 3ceb3fa..89aee60 100644 --- a/src/message_iterator.cpp +++ b/src/message_iterator.cpp @@ -140,13 +140,8 @@ string MessageIterator::sniff_tag(::google::protobuf::io::ZeroCopyInputStream& s return tag; } -MessageIterator::MessageIterator(istream& in, bool verbose, size_t thread_count) : MessageIterator(unique_ptr(new BlockedGzipInputStream(in)), verbose) { - if (thread_count > 1) { - // After making the BGZF, turn on multithreaded decoding - if (!bgzip_in->EnableMultiThreading(thread_count)) { - throw std::runtime_error("Cound not enable multithreaded BGZF decoding"); - } - } +MessageIterator::MessageIterator(istream& in, bool verbose, size_t thread_count) : MessageIterator(unique_ptr(new BlockedGzipInputStream(in, thread_count)), verbose) { + // Nothing to do! } MessageIterator::MessageIterator(unique_ptr&& bgzf, bool verbose) :