From 3517b26161332ad33dcd424584714408074d0548 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 18 Oct 2024 20:11:15 +0000 Subject: [PATCH 01/14] partial work --- cpp/src/io/comp/uncomp.cpp | 69 ++++++++++- cpp/src/io/json/read_json.cu | 219 ++++++++++++++++++++--------------- 2 files changed, 191 insertions(+), 97 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 1af45b41d8e..c5d75b455f5 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -336,9 +336,9 @@ std::vector decompress(compression_type compression, host_span 4) { auto const* fhdr = reinterpret_cast(raw); @@ -558,5 +558,72 @@ size_t decompress(compression_type compression, } } +size_t estimate_uncompressed_size(compression_type compression, host_span src) { + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + switch (compression) { + case compression_type::GZIP: { + gz_archive_s gz; + if (ParseGZArchive(&gz, src.data(), src.size())) + return gz.isize; + } + case compression_type::ZIP: { + zip_archive_s za; + if (OpenZipArchive(&za, src.data(), src.size())) { + size_t cdfh_ofs = 0; + for (int i = 0; i < za.eocd->num_entries; i++) { + auto const* cdfh = reinterpret_cast( + reinterpret_cast(za.cdfh) + cdfh_ofs); + int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; + if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { + // Bad cdir + break; + } + // For now, only accept with non-zero file sizes and DEFLATE + if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { + size_t lfh_ofs = cdfh->hdr_ofs; + auto const* lfh = reinterpret_cast(raw + lfh_ofs); + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { + if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { + size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; + size_t file_end = file_start + lfh->comp_size; + if (file_end <= src.size()) { + // Pick the first valid file of non-zero size (only 1 file expected in archive) + return lfh->uncomp_size; + } + } + } + } + cdfh_ofs += cdfh_len; + } + } + } + case compression_type::SNAPPY: { + uint32_t uncompressed_size; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) + { + uint32_t l = 0, c; + uncompressed_size = 0; + do { + c = *cur++; + auto const lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { return 0; } + uncompressed_size |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncompressed_size != 0 and cur < end, + "Destination buffer too small"); + } + return uncompressed_size; + } + default: return 0; + } +} + } // namespace io } // namespace cudf diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c424d2b3b62..c57e6457127 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -238,6 +238,110 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } +table_with_metadata create_batched_cudf_table(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + /* + * The batched JSON reader enforces that the size of each batch is at most INT_MAX + * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by + * chunk offset and chunk size - that may span across multiple source files. + * Note that the batched reader does not work for compressed inputs or for regular + * JSON inputs. + */ + std::size_t const total_source_size = sources_size(sources, 0, 0); + + // Batching is enabled only for JSONL inputs, not regular JSON files + CUDF_EXPECTS( + reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), + "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); + + std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); + std::size_t const batch_size = + batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + + /* + * Identify the position (zero-indexed) of starting source file from which to begin + * batching based on byte range offset. If the offset is larger than the sum of all + * source sizes, then start_source is total number of source files i.e. no file is + * read + */ + + // Prefix sum of source file sizes + std::size_t pref_source_size = 0; + // Starting source file from which to being batching evaluated using byte range offset + std::size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { + for (std::size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { + if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } + pref_source_size += sources[src_idx]->size(); + } + return sources.size(); + }(); + /* + * Construct batches of byte ranges spanning source files, with the starting position of batches + * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current + * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading + * stops. + */ + std::size_t pref_bytes_size = chunk_offset; + std::size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (std::size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + pref_source_size += sources[i]->size(); + // If the current source file can subsume multiple batches, we split the file until the + // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) + while (pref_bytes_size < end_bytes_size && + pref_source_size >= std::min(pref_bytes_size + batch_size, end_bytes_size)) { + auto next_batch_size = std::min(batch_size, end_bytes_size - pref_bytes_size); + batch_offsets.push_back(batch_offsets.back() + next_batch_size); + pref_bytes_size += next_batch_size; + } + i++; + } + /* + * If there is a single batch, then we can directly return the table without the + * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, + * or if end_bytes_size is larger than total_source_size. + */ + if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); + + std::vector partial_tables; + json_reader_options batched_reader_opts{reader_opts}; + // Dispatch individual batches to read_batch and push the resulting table into + // partial_tables array. Note that the reader options need to be updated for each + // batch to adjust byte range offset and byte range size. + for (std::size_t i = 0; i < batch_offsets.size() - 1; i++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[i]); + batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); + partial_tables.emplace_back( + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + } + + auto expects_schema_equality = + std::all_of(partial_tables.begin() + 1, + partial_tables.end(), + [> = partial_tables[0].metadata.schema_info](auto& ptbl) { + return ptbl.metadata.schema_info == gt; + }); + CUDF_EXPECTS(expects_schema_equality, + "Mismatch in JSON schema across batches in multi-source multi-batch reading"); + + auto partial_table_views = std::vector(partial_tables.size()); + std::transform(partial_tables.begin(), + partial_tables.end(), + partial_table_views.begin(), + [](auto const& table) { return table.tbl->view(); }); + return table_with_metadata{cudf::concatenate(partial_table_views, stream, mr), + {partial_tables[0].metadata.schema_info}}; +} + } // anonymous namespace device_span ingest_raw_input(device_span buffer, @@ -315,15 +419,19 @@ device_span ingest_raw_input(device_span buffer, // Reading to host because decompression of a single block is much faster on the CPU sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); auto uncomp_data = decompress(compression, hbuffer); + std::printf("decompressed into host buffer\n"); CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), reinterpret_cast(uncomp_data.data()), uncomp_data.size() * sizeof(char), cudaMemcpyHostToDevice, stream.value())); + std::printf("rekt\n"); stream.synchronize(); return buffer.first(uncomp_data.size()); } + + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -336,110 +444,29 @@ table_with_metadata read_json(host_span> sources, "Specifying a byte range is supported only for JSON Lines"); } - if (sources.size() > 1) { - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, - "Multiple compressed inputs are not supported"); + if(sources.size() > 1) { CUDF_EXPECTS(reader_opts.is_enabled_lines(), "Multiple inputs are supported only for JSON Lines format"); } - /* - * The batched JSON reader enforces that the size of each batch is at most INT_MAX - * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by - * chunk offset and chunk size - that may span across multiple source files. - * Note that the batched reader does not work for compressed inputs or for regular - * JSON inputs. - */ - std::size_t const total_source_size = sources_size(sources, 0, 0); - - // Batching is enabled only for JSONL inputs, not regular JSON files - CUDF_EXPECTS( - reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), - "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); - - std::size_t chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? total_source_size - chunk_offset - : std::min(chunk_size, total_source_size - chunk_offset); - - std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); - std::size_t const batch_size = - batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); - - /* - * Identify the position (zero-indexed) of starting source file from which to begin - * batching based on byte range offset. If the offset is larger than the sum of all - * source sizes, then start_source is total number of source files i.e. no file is - * read - */ - - // Prefix sum of source file sizes - std::size_t pref_source_size = 0; - // Starting source file from which to being batching evaluated using byte range offset - std::size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { - for (std::size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { - if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } - pref_source_size += sources[src_idx]->size(); - } - return sources.size(); - }(); - /* - * Construct batches of byte ranges spanning source files, with the starting position of batches - * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current - * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading - * stops. - */ - std::size_t pref_bytes_size = chunk_offset; - std::size_t end_bytes_size = chunk_offset + chunk_size; - std::vector batch_offsets{pref_bytes_size}; - for (std::size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { - pref_source_size += sources[i]->size(); - // If the current source file can subsume multiple batches, we split the file until the - // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) - while (pref_bytes_size < end_bytes_size && - pref_source_size >= std::min(pref_bytes_size + batch_size, end_bytes_size)) { - auto next_batch_size = std::min(batch_size, end_bytes_size - pref_bytes_size); - batch_offsets.push_back(batch_offsets.back() + next_batch_size); - pref_bytes_size += next_batch_size; - } - i++; + if(reader_opts.get_compression() == compression_type::NONE) + return create_batched_cudf_table(sources, reader_opts, stream, mr); + + CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 && reader_opts.get_byte_range_size() == 0, + "Byte range reading from compressed inputs is not supported"); + std::vector> compressed_buffers; + std::vector> compressed_sources; + for(size_t i = 0; i < sources.size(); i++) { + compressed_buffers.emplace_back(sources[i]->host_read(0, sources[i]->size())); + compressed_sources.emplace_back(datasource::create( + cudf::host_span(reinterpret_cast(compressed_buffers.back()->data()), compressed_buffers.back()->size()))); } - /* - * If there is a single batch, then we can directly return the table without the - * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, - * or if end_bytes_size is larger than total_source_size. - */ - if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); + // TODO: in create_batched_cudf_table, we need the compressed source size to actually be the uncompressed source size for correct batching + // can we do some kind of hacky overload of size() member function? + return create_batched_cudf_table(compressed_sources, reader_opts, stream, mr); - std::vector partial_tables; - json_reader_options batched_reader_opts{reader_opts}; - // Dispatch individual batches to read_batch and push the resulting table into - // partial_tables array. Note that the reader options need to be updated for each - // batch to adjust byte range offset and byte range size. - for (std::size_t i = 0; i < batch_offsets.size() - 1; i++) { - batched_reader_opts.set_byte_range_offset(batch_offsets[i]); - batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); - partial_tables.emplace_back( - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); - } - auto expects_schema_equality = - std::all_of(partial_tables.begin() + 1, - partial_tables.end(), - [> = partial_tables[0].metadata.schema_info](auto& ptbl) { - return ptbl.metadata.schema_info == gt; - }); - CUDF_EXPECTS(expects_schema_equality, - "Mismatch in JSON schema across batches in multi-source multi-batch reading"); - auto partial_table_views = std::vector(partial_tables.size()); - std::transform(partial_tables.begin(), - partial_tables.end(), - partial_table_views.begin(), - [](auto const& table) { return table.tbl->view(); }); - return table_with_metadata{cudf::concatenate(partial_table_views, stream, mr), - {partial_tables[0].metadata.schema_info}}; } } // namespace cudf::io::json::detail From 1cc6f464648818064a4cb4cefb7d62e24ef7eccc Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 23 Oct 2024 23:54:14 +0000 Subject: [PATCH 02/14] compressed input datasource --- cpp/src/io/comp/io_uncomp.hpp | 1 + cpp/src/io/comp/uncomp.cpp | 5 +- cpp/src/io/json/read_json.cu | 120 +++++++++++++++++++--------------- cpp/src/io/json/read_json.hpp | 35 ++++++++++ 4 files changed, 105 insertions(+), 56 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 1c9578fa5c0..ccab1ad77d3 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -43,6 +43,7 @@ size_t decompress(compression_type compression, host_span dst, rmm::cuda_stream_view stream); +size_t estimate_uncompressed_size(compression_type compression, host_span src); /** * @brief GZIP header flags * See https://tools.ietf.org/html/rfc1952 diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index c5d75b455f5..5d943e4f4c4 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -560,10 +560,9 @@ size_t decompress(compression_type compression, size_t estimate_uncompressed_size(compression_type compression, host_span src) { auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; switch (compression) { + case compression_type::NONE: + return src.size(); case compression_type::GZIP: { gz_archive_s gz; if (ParseGZArchive(&gz, src.data(), src.size())) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c57e6457127..063dd213316 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -144,12 +144,16 @@ datasource::owning_buffer get_record_range_raw_input( // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - auto constexpr header_size = 4096; + //auto constexpr header_size = 4096; + /* std::size_t buffer_size = reader_compression != compression_type::NONE ? total_source_size * estimated_compression_ratio + header_size : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; + */ + std::size_t buffer_size = std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_buffer buffer(buffer_size, stream); device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); @@ -193,11 +197,14 @@ datasource::owning_buffer get_record_range_raw_input( // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; + /* buffer_size = reader_compression != compression_type::NONE ? 2 * buffer_size : std::min(total_source_size, buffer_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; + */ + buffer_size = std::min(total_source_size, buffer_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; buffer.resize(buffer_size, stream); bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); } @@ -357,30 +364,30 @@ device_span ingest_raw_input(device_span buffer, // delimiter. auto constexpr num_delimiter_chars = 1; - if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); - std::vector> h_buffers; - std::size_t bytes_read = 0; - std::transform_inclusive_scan(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - std::plus{}, - [](std::unique_ptr const& s) { return s->size(); }); - auto upper = - std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); - range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; - i++) { - if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read + - (num_delimiter_chars * delimiter_map.size()); + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + std::size_t bytes_read = 0; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](std::unique_ptr const& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto const total_bytes_to_read = + std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; + i++) { + if (sources[i]->is_empty()) continue; + auto data_size = + std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); + if (compression == compression_type::NONE) { if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); } else { @@ -390,28 +397,36 @@ device_span ingest_raw_input(device_span buffer, destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); bytes_read += h_buffer->size(); } - range_offset = 0; - delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); } - // Removing delimiter inserted after last non-empty source is read - if (!delimiter_map.empty()) { delimiter_map.pop_back(); } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator('\n'); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - delimiter_map, stream, cudf::get_current_device_resource_ref()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - buffer.data()); + else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); + bytes_read += h_buffer->size(); } - stream.synchronize(); - return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); + range_offset = 0; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); + } + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1 && !delimiter_map.empty()) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator('\n'); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + delimiter_map, stream, cudf::get_current_device_resource_ref()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + buffer.data()); } + stream.synchronize(); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); + /* // TODO: allow byte range reading from multiple compressed files. auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); auto hbuffer = std::vector(remaining_bytes_to_read); @@ -428,10 +443,9 @@ device_span ingest_raw_input(device_span buffer, std::printf("rekt\n"); stream.synchronize(); return buffer.first(uncomp_data.size()); + */ } - - table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -452,21 +466,21 @@ table_with_metadata read_json(host_span> sources, if(reader_opts.get_compression() == compression_type::NONE) return create_batched_cudf_table(sources, reader_opts, stream, mr); + /* CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 && reader_opts.get_byte_range_size() == 0, "Byte range reading from compressed inputs is not supported"); + */ + CUDF_EXPECTS(reader_opts.get_compression() == compression_type::GZIP || reader_opts.get_compression() == compression_type::ZIP || reader_opts.get_compression() == compression_type::SNAPPY, "Unsupported compression type"); std::vector> compressed_buffers; std::vector> compressed_sources; for(size_t i = 0; i < sources.size(); i++) { compressed_buffers.emplace_back(sources[i]->host_read(0, sources[i]->size())); - compressed_sources.emplace_back(datasource::create( - cudf::host_span(reinterpret_cast(compressed_buffers.back()->data()), compressed_buffers.back()->size()))); + compressed_sources.emplace_back(std::make_unique( + cudf::host_span(reinterpret_cast(compressed_buffers.back()->data()), compressed_buffers.back()->size()), + reader_opts.get_compression())); } - // TODO: in create_batched_cudf_table, we need the compressed source size to actually be the uncompressed source size for correct batching - // can we do some kind of hacky overload of size() member function? + // in create_batched_cudf_table, we need the compressed source size to actually be the uncompressed source size for correct batching return create_batched_cudf_table(compressed_sources, reader_opts, stream, mr); - - - } } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 982190eecb5..d0d4a21bbfe 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -16,6 +16,8 @@ #pragma once +#include "../comp/io_uncomp.hpp" + #include #include #include @@ -73,5 +75,38 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +class compressed_host_buffer_source final : public datasource { + public: + explicit compressed_host_buffer_source(cudf::host_span ch_buffer, compression_type comptype) : _ch_buffer{ch_buffer}, _comptype{comptype} { + _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + return std::make_unique>>(std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { + return _decompressed_ch_buffer_size; + } + + private: + cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data + compression_type _comptype; + size_t _decompressed_ch_buffer_size; +}; + } // namespace io::json::detail } // namespace CUDF_EXPORT cudf From 1f462232f783b9b2525c4df4b7352ef2cb418bac Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 23 Oct 2024 23:56:16 +0000 Subject: [PATCH 03/14] formatting --- cpp/src/io/comp/io_uncomp.hpp | 2 +- cpp/src/io/comp/uncomp.cpp | 14 ++++---- cpp/src/io/json/read_json.cu | 49 ++++++++++++++------------- cpp/src/io/json/read_json.hpp | 62 ++++++++++++++++++----------------- 4 files changed, 66 insertions(+), 61 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index ccab1ad77d3..487d76db06a 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 5d943e4f4c4..76133e940e5 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -558,15 +558,14 @@ size_t decompress(compression_type compression, } } -size_t estimate_uncompressed_size(compression_type compression, host_span src) { - auto raw = src.data(); +size_t estimate_uncompressed_size(compression_type compression, host_span src) +{ + auto raw = src.data(); switch (compression) { - case compression_type::NONE: - return src.size(); + case compression_type::NONE: return src.size(); case compression_type::GZIP: { gz_archive_s gz; - if (ParseGZArchive(&gz, src.data(), src.size())) - return gz.isize; + if (ParseGZArchive(&gz, src.data(), src.size())) return gz.isize; } case compression_type::ZIP: { zip_archive_s za; @@ -615,8 +614,7 @@ size_t estimate_uncompressed_size(compression_type compression, host_span 0x7f && cur < end); - CUDF_EXPECTS(uncompressed_size != 0 and cur < end, - "Destination buffer too small"); + CUDF_EXPECTS(uncompressed_size != 0 and cur < end, "Destination buffer too small"); } return uncompressed_size; } diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 063dd213316..18f9363d34c 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -144,7 +144,7 @@ datasource::owning_buffer get_record_range_raw_input( // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - //auto constexpr header_size = 4096; + // auto constexpr header_size = 4096; /* std::size_t buffer_size = reader_compression != compression_type::NONE @@ -152,8 +152,9 @@ datasource::owning_buffer get_record_range_raw_input( : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; */ - std::size_t buffer_size = std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + std::size_t buffer_size = + std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_buffer buffer(buffer_size, stream); device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); @@ -204,7 +205,9 @@ datasource::owning_buffer get_record_range_raw_input( buffer_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; */ - buffer_size = std::min(total_source_size, buffer_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; + buffer_size = std::min(total_source_size, + buffer_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; buffer.resize(buffer_size, stream); bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); } @@ -246,9 +249,9 @@ table_with_metadata read_batch(host_span> sources, } table_with_metadata create_batched_cudf_table(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { /* * The batched JSON reader enforces that the size of each batch is at most INT_MAX @@ -377,14 +380,11 @@ device_span ingest_raw_input(device_span buffer, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); + auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; - i++) { + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); auto destination = reinterpret_cast(buffer.data()) + bytes_read + (num_delimiter_chars * delimiter_map.size()); if (compression == compression_type::NONE) { @@ -397,8 +397,7 @@ device_span ingest_raw_input(device_span buffer, destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); bytes_read += h_buffer->size(); } - } - else { + } else { h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); auto const& h_buffer = h_buffers.back(); CUDF_CUDA_TRY(cudaMemcpyAsync( @@ -458,28 +457,34 @@ table_with_metadata read_json(host_span> sources, "Specifying a byte range is supported only for JSON Lines"); } - if(sources.size() > 1) { + if (sources.size() > 1) { CUDF_EXPECTS(reader_opts.is_enabled_lines(), "Multiple inputs are supported only for JSON Lines format"); } - if(reader_opts.get_compression() == compression_type::NONE) + if (reader_opts.get_compression() == compression_type::NONE) return create_batched_cudf_table(sources, reader_opts, stream, mr); /* CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 && reader_opts.get_byte_range_size() == 0, "Byte range reading from compressed inputs is not supported"); */ - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::GZIP || reader_opts.get_compression() == compression_type::ZIP || reader_opts.get_compression() == compression_type::SNAPPY, "Unsupported compression type"); + CUDF_EXPECTS(reader_opts.get_compression() == compression_type::GZIP || + reader_opts.get_compression() == compression_type::ZIP || + reader_opts.get_compression() == compression_type::SNAPPY, + "Unsupported compression type"); std::vector> compressed_buffers; std::vector> compressed_sources; - for(size_t i = 0; i < sources.size(); i++) { + for (size_t i = 0; i < sources.size(); i++) { compressed_buffers.emplace_back(sources[i]->host_read(0, sources[i]->size())); compressed_sources.emplace_back(std::make_unique( - cudf::host_span(reinterpret_cast(compressed_buffers.back()->data()), compressed_buffers.back()->size()), - reader_opts.get_compression())); + cudf::host_span( + reinterpret_cast(compressed_buffers.back()->data()), + compressed_buffers.back()->size()), + reader_opts.get_compression())); } - // in create_batched_cudf_table, we need the compressed source size to actually be the uncompressed source size for correct batching + // in create_batched_cudf_table, we need the compressed source size to actually be the + // uncompressed source size for correct batching return create_batched_cudf_table(compressed_sources, reader_opts, stream, mr); } diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index d0d4a21bbfe..d8ed3ec2820 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -76,36 +76,38 @@ table_with_metadata read_json(host_span> sources, rmm::device_async_resource_ref mr); class compressed_host_buffer_source final : public datasource { - public: - explicit compressed_host_buffer_source(cudf::host_span ch_buffer, compression_type comptype) : _ch_buffer{ch_buffer}, _comptype{comptype} { - _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); - } - - size_t host_read(size_t offset, size_t size, uint8_t* dst) override - { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - std::memcpy(dst, decompressed_hbuf.data() + offset, count); - return count; - } - - std::unique_ptr host_read(size_t offset, size_t size) override - { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - return std::make_unique>>(std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); - } - - [[nodiscard]] bool supports_device_read() const override { return false; } - - [[nodiscard]] size_t size() const override { - return _decompressed_ch_buffer_size; - } - - private: - cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data - compression_type _comptype; - size_t _decompressed_ch_buffer_size; + public: + explicit compressed_host_buffer_source(cudf::host_span ch_buffer, + compression_type comptype) + : _ch_buffer{ch_buffer}, _comptype{comptype} + { + _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + + private: + cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data + compression_type _comptype; + size_t _decompressed_ch_buffer_size; }; } // namespace io::json::detail From 334ef068a2054a27ef6c7b57b6a1eaee79a4fffe Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 24 Oct 2024 00:57:34 +0000 Subject: [PATCH 04/14] improving the datasoruce --- cpp/src/io/json/read_json.hpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index d8ed3ec2820..4e85ebba7ac 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -82,6 +82,7 @@ class compressed_host_buffer_source final : public datasource { : _ch_buffer{ch_buffer}, _comptype{comptype} { _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + _decompressed_buffer.resize(0); } size_t host_read(size_t offset, size_t size, uint8_t* dst) override @@ -94,10 +95,17 @@ class compressed_host_buffer_source final : public datasource { std::unique_ptr host_read(size_t offset, size_t size) override { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - return std::make_unique>>( - std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + return std::make_unique(_decompressed_buffer.data() + offset, count); } [[nodiscard]] bool supports_device_read() const override { return false; } @@ -108,6 +116,7 @@ class compressed_host_buffer_source final : public datasource { cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data compression_type _comptype; size_t _decompressed_ch_buffer_size; + std::vector _decompressed_buffer; }; } // namespace io::json::detail From 839bddad2e98e7b04c99dc940df4c1ff20cd51be Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 24 Oct 2024 16:57:26 +0000 Subject: [PATCH 05/14] cleanup --- cpp/src/io/json/read_json.cu | 52 ++---------------------------------- 1 file changed, 2 insertions(+), 50 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 18f9363d34c..b6b2d4b8736 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -141,17 +141,6 @@ datasource::owning_buffer get_record_range_raw_input( int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - // The allocation for single source compressed input is estimated by assuming a ~4:1 - // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea - // of subchunks. - // auto constexpr header_size = 4096; - /* - std::size_t buffer_size = - reader_compression != compression_type::NONE - ? total_source_size * estimated_compression_ratio + header_size - : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; - */ std::size_t buffer_size = std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; @@ -198,13 +187,6 @@ datasource::owning_buffer get_record_range_raw_input( // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; - /* - buffer_size = reader_compression != compression_type::NONE - ? 2 * buffer_size - : std::min(total_source_size, - buffer_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; - */ buffer_size = std::min(total_source_size, buffer_size + num_subchunks_prealloced * size_per_subchunk) + num_extra_delimiters; @@ -387,16 +369,8 @@ device_span ingest_raw_input(device_span buffer, auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); auto destination = reinterpret_cast(buffer.data()) + bytes_read + (num_delimiter_chars * delimiter_map.size()); - if (compression == compression_type::NONE) { - if (sources[i]->is_device_read_preferred(data_size)) { - bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); - bytes_read += h_buffer->size(); - } + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); } else { h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); auto const& h_buffer = h_buffers.back(); @@ -425,24 +399,6 @@ device_span ingest_raw_input(device_span buffer, } stream.synchronize(); return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); - /* - // TODO: allow byte range reading from multiple compressed files. - auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); - auto uncomp_data = decompress(compression, hbuffer); - std::printf("decompressed into host buffer\n"); - CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), - reinterpret_cast(uncomp_data.data()), - uncomp_data.size() * sizeof(char), - cudaMemcpyHostToDevice, - stream.value())); - std::printf("rekt\n"); - stream.synchronize(); - return buffer.first(uncomp_data.size()); - */ } table_with_metadata read_json(host_span> sources, @@ -465,10 +421,6 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_compression() == compression_type::NONE) return create_batched_cudf_table(sources, reader_opts, stream, mr); - /* - CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 && reader_opts.get_byte_range_size() == 0, - "Byte range reading from compressed inputs is not supported"); - */ CUDF_EXPECTS(reader_opts.get_compression() == compression_type::GZIP || reader_opts.get_compression() == compression_type::ZIP || reader_opts.get_compression() == compression_type::SNAPPY, From 42a4b1b3547d35bb25a68573b83a79bd418dff13 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 24 Oct 2024 17:44:47 +0000 Subject: [PATCH 06/14] slow path for some compression formats --- cpp/src/io/json/read_json.cu | 4 ---- cpp/src/io/json/read_json.hpp | 10 ++++++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index b6b2d4b8736..4178c790265 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -421,10 +421,6 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_compression() == compression_type::NONE) return create_batched_cudf_table(sources, reader_opts, stream, mr); - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::GZIP || - reader_opts.get_compression() == compression_type::ZIP || - reader_opts.get_compression() == compression_type::SNAPPY, - "Unsupported compression type"); std::vector> compressed_buffers; std::vector> compressed_sources; for (size_t i = 0; i < sources.size(); i++) { diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 4e85ebba7ac..00aaf82cb05 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -81,8 +81,14 @@ class compressed_host_buffer_source final : public datasource { compression_type comptype) : _ch_buffer{ch_buffer}, _comptype{comptype} { - _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); - _decompressed_buffer.resize(0); + if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || + comptype == compression_type::SNAPPY) { + _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + _decompressed_buffer.resize(0); + } else { + _decompressed_buffer = decompress(_comptype, _ch_buffer); + _decompressed_ch_buffer_size = _decompressed_buffer.size(); + } } size_t host_read(size_t offset, size_t size, uint8_t* dst) override From c3b6cb31abb8f7b7726032e36a4d754b0831d478 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 24 Oct 2024 18:19:24 +0000 Subject: [PATCH 07/14] cleanup --- cpp/src/io/json/read_json.cu | 51 +++++++++++++++++++++++++++++++++++ cpp/src/io/json/read_json.hpp | 51 ----------------------------------- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 4178c790265..238a5056316 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,56 @@ namespace cudf::io::json::detail { namespace { +class compressed_host_buffer_source final : public datasource { + public: + explicit compressed_host_buffer_source(cudf::host_span ch_buffer, + compression_type comptype) + : _ch_buffer{ch_buffer}, _comptype{comptype} + { + if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || + comptype == compression_type::SNAPPY) { + _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + _decompressed_buffer.resize(0); + } else { + _decompressed_buffer = decompress(_comptype, _ch_buffer); + _decompressed_ch_buffer_size = _decompressed_buffer.size(); + } + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + return std::make_unique(_decompressed_buffer.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + + private: + cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data + compression_type _comptype; + size_t _decompressed_ch_buffer_size; + std::vector _decompressed_buffer; +}; + // Return total size of sources enclosing the passed range std::size_t sources_size(host_span> const sources, std::size_t range_offset, diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 00aaf82cb05..536efcfdba6 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -18,7 +18,6 @@ #include "../comp/io_uncomp.hpp" -#include #include #include #include @@ -75,55 +74,5 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); -class compressed_host_buffer_source final : public datasource { - public: - explicit compressed_host_buffer_source(cudf::host_span ch_buffer, - compression_type comptype) - : _ch_buffer{ch_buffer}, _comptype{comptype} - { - if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || - comptype == compression_type::SNAPPY) { - _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); - _decompressed_buffer.resize(0); - } else { - _decompressed_buffer = decompress(_comptype, _ch_buffer); - _decompressed_ch_buffer_size = _decompressed_buffer.size(); - } - } - - size_t host_read(size_t offset, size_t size, uint8_t* dst) override - { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - std::memcpy(dst, decompressed_hbuf.data() + offset, count); - return count; - } - - std::unique_ptr host_read(size_t offset, size_t size) override - { - if (_decompressed_buffer.empty()) { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - bool partial_read = offset + count < decompressed_hbuf.size(); - if (!partial_read) - return std::make_unique>>( - std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); - _decompressed_buffer = std::move(decompressed_hbuf); - } - auto const count = std::min(size, _decompressed_buffer.size() - offset); - return std::make_unique(_decompressed_buffer.data() + offset, count); - } - - [[nodiscard]] bool supports_device_read() const override { return false; } - - [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } - - private: - cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data - compression_type _comptype; - size_t _decompressed_ch_buffer_size; - std::vector _decompressed_buffer; -}; - } // namespace io::json::detail } // namespace CUDF_EXPORT cudf From e116fa7bb0917139edb27815f8c6d4c4f995bd8a Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 24 Oct 2024 18:46:13 +0000 Subject: [PATCH 08/14] remove include --- cpp/src/io/json/read_json.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 536efcfdba6..982190eecb5 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -16,8 +16,7 @@ #pragma once -#include "../comp/io_uncomp.hpp" - +#include #include #include #include From 3cd7c1d1986cf9bac24b3011ad24548cc025cfe6 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 25 Oct 2024 18:19:45 +0000 Subject: [PATCH 09/14] pr feedback --- cpp/src/io/json/read_json.cu | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 238a5056316..8ce8117cddd 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -44,10 +44,11 @@ namespace { class compressed_host_buffer_source final : public datasource { public: - explicit compressed_host_buffer_source(cudf::host_span ch_buffer, - compression_type comptype) - : _ch_buffer{ch_buffer}, _comptype{comptype} + explicit compressed_host_buffer_source(std::unique_ptr src, compression_type comptype) + : _comptype{comptype} { + auto dbuf = src->host_read(0, src->size()); + _ch_buffer = std::vector(dbuf->data(), dbuf->data() + dbuf->size()); if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || comptype == compression_type::SNAPPY) { _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); @@ -86,7 +87,7 @@ class compressed_host_buffer_source final : public datasource { [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } private: - cudf::host_span _ch_buffer; ///< A non-owning view of the existing host data + std::vector _ch_buffer; compression_type _comptype; size_t _decompressed_ch_buffer_size; std::vector _decompressed_buffer; @@ -472,15 +473,10 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_compression() == compression_type::NONE) return create_batched_cudf_table(sources, reader_opts, stream, mr); - std::vector> compressed_buffers; std::vector> compressed_sources; for (size_t i = 0; i < sources.size(); i++) { - compressed_buffers.emplace_back(sources[i]->host_read(0, sources[i]->size())); compressed_sources.emplace_back(std::make_unique( - cudf::host_span( - reinterpret_cast(compressed_buffers.back()->data()), - compressed_buffers.back()->size()), - reader_opts.get_compression())); + std::move(sources[i]), reader_opts.get_compression())); } // in create_batched_cudf_table, we need the compressed source size to actually be the // uncompressed source size for correct batching From 4bd817efc3b4cb2e1869204e4a71daa71b5fdb68 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 6 Nov 2024 12:04:57 +0000 Subject: [PATCH 10/14] reorg and cleanup --- cpp/src/io/comp/io_uncomp.hpp | 4 +- cpp/src/io/comp/uncomp.cpp | 334 ++++++++++++++++++---------------- cpp/src/io/json/read_json.hpp | 2 + 3 files changed, 186 insertions(+), 154 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 487d76db06a..d6ff2091c30 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -25,7 +25,7 @@ using cudf::host_span; -namespace cudf { +namespace CUDF_EXPORT cudf { namespace io { /** @@ -57,4 +57,4 @@ constexpr uint8_t fcomment = 0x10; // Comment present }; // namespace GZIPHeaderFlag } // namespace io -} // namespace cudf +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 06ee63a87f0..60d715fa5ce 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "gpuinflate.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "io_uncomp.hpp" #include "nvcomp_adapter.hpp" @@ -24,8 +25,6 @@ #include #include -#include - #include // uncompress #include // memset @@ -278,124 +277,6 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); } -std::vector decompress(compression_type compression, host_span src) -{ - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - - auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; - - switch (compression) { - case compression_type::AUTO: - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src.size())) { - compression = compression_type::GZIP; - comp_data = gz.comp_data; - comp_len = gz.comp_len; - uncomp_len = gz.isize; - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, raw, src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - compression = compression_type::ZIP; - comp_data = raw + file_start; - comp_len = lfh->comp_size; - uncomp_len = lfh->uncomp_size; - break; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::BZIP2: - if (src.size() > 4) { - auto const* fhdr = reinterpret_cast(raw); - // Check for BZIP2 file signature "BZh1" to "BZh9" - if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && - fhdr->blksz >= '1' && fhdr->blksz <= '9') { - compression = compression_type::BZIP2; - comp_data = raw; - comp_len = src.size(); - uncomp_len = 0; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - default: CUDF_FAIL("Unsupported compressed stream type"); - } - - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); - - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size - } - - if (compression == compression_type::GZIP || compression == compression_type::ZIP) { - // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); - return dst; - } - if (compression == compression_type::BZIP2) { - size_t src_ofs = 0; - size_t dst_ofs = 0; - int bz_err = 0; - std::vector dst(uncomp_len); - do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); - if (bz_err == BZ_OUTBUFF_FULL) { - // TBD: We could infer the compression ratio based on produced/consumed byte counts - // in order to minimize realloc events and over-allocation - dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); - dst.resize(dst_len); - uncomp_len = dst_len; - } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); - } - } while (bz_err == BZ_OUTBUFF_FULL); - CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); - return dst; - } - - CUDF_FAIL("Unsupported compressed stream type"); -} - /** * @brief ZLIB host decompressor (no header) */ @@ -421,7 +302,7 @@ size_t decompress_gzip(host_span src, host_span dst) /** * @brief SNAPPY host decompressor */ -size_t decompress_snappy(host_span src, host_span dst) +size_t host_decompress_snappy(host_span src, host_span dst) { CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs"); uint32_t uncompressed_size, bytes_left, dst_pos; @@ -502,6 +383,50 @@ size_t decompress_snappy(host_span src, host_span dst) return uncompressed_size; } +/** + * @brief SNAPPY device decompressor + */ +size_t decompress_snappy(host_span src, + host_span dst, + rmm::cuda_stream_view stream) +{ + // Init device span of spans (source) + auto const d_src = + cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); + auto hd_srcs = cudf::detail::hostdevice_vector>(1, stream); + hd_srcs[0] = d_src; + hd_srcs.host_to_device_async(stream); + + // Init device span of spans (temporary destination) + auto d_dst = rmm::device_uvector(dst.size(), stream); + auto hd_dsts = cudf::detail::hostdevice_vector>(1, stream); + hd_dsts[0] = d_dst; + hd_dsts.host_to_device_async(stream); + + auto hd_stats = cudf::detail::hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device_async(stream); + + auto const max_uncomp_page_size = dst.size(); + nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, + hd_srcs, + hd_dsts, + hd_stats, + max_uncomp_page_size, + max_uncomp_page_size, + stream); + + hd_stats.device_to_host_sync(stream); + CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); + + // Copy temporary output to `dst` + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); + + return hd_stats[0].bytes_written; +} + /** * @brief ZSTD decompressor that uses nvcomp */ @@ -538,40 +463,43 @@ size_t decompress_zstd(host_span src, CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` - cudf::detail::cuda_memcpy_async( - dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); return hd_stats[0].bytes_written; } -size_t decompress(compression_type compression, - host_span src, - host_span dst, - rmm::cuda_stream_view stream) -{ - switch (compression) { - case compression_type::GZIP: return decompress_gzip(src, dst); - case compression_type::ZLIB: return decompress_zlib(src, dst); - case compression_type::SNAPPY: return decompress_snappy(src, dst); - case compression_type::ZSTD: return decompress_zstd(src, dst, stream); - default: CUDF_FAIL("Unsupported compression type"); - } -} +struct source_properties { + compression_type compression = compression_type::NONE; + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; +}; -size_t estimate_uncompressed_size(compression_type compression, host_span src) +source_properties get_source_properties(compression_type compression, host_span src) { - auto raw = src.data(); + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + switch (compression) { - case compression_type::NONE: return src.size(); + case compression_type::AUTO: case compression_type::GZIP: { gz_archive_s gz; - if (ParseGZArchive(&gz, src.data(), src.size())) return gz.isize; + if (ParseGZArchive(&gz, raw, src.size())) { + compression = compression_type::GZIP; + comp_data = gz.comp_data; + comp_len = gz.comp_len; + uncomp_len = gz.isize; + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; } case compression_type::ZIP: { zip_archive_s za; - if (OpenZipArchive(&za, src.data(), src.size())) { + if (OpenZipArchive(&za, raw, src.size())) { size_t cdfh_ofs = 0; for (int i = 0; i < za.eocd->num_entries; i++) { auto const* cdfh = reinterpret_cast( @@ -592,7 +520,11 @@ size_t estimate_uncompressed_size(compression_type compression, host_spancomp_size; if (file_end <= src.size()) { // Pick the first valid file of non-zero size (only 1 file expected in archive) - return lfh->uncomp_size; + compression = compression_type::ZIP; + comp_data = raw + file_start; + comp_len = lfh->comp_size; + uncomp_len = lfh->uncomp_size; + break; } } } @@ -600,28 +532,126 @@ size_t estimate_uncompressed_size(compression_type compression, host_span 4) { + auto const* fhdr = reinterpret_cast(raw); + // Check for BZIP2 file signature "BZh1" to "BZh9" + if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && + fhdr->blksz >= '1' && fhdr->blksz <= '9') { + compression = compression_type::BZIP2; + comp_data = raw; + comp_len = src.size(); + uncomp_len = 0; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; } case compression_type::SNAPPY: { - uint32_t uncompressed_size; + uncomp_len = 0; auto cur = src.begin(); auto const end = src.end(); // Read uncompressed length (varint) { - uint32_t l = 0, c; - uncompressed_size = 0; + uint32_t l = 0, c; do { c = *cur++; auto const lo7 = c & 0x7f; - if (l >= 28 && c > 0xf) { return 0; } - uncompressed_size |= lo7 << l; + if (l >= 28 && c > 0xf) { + uncomp_len = 0; + break; + } + uncomp_len |= lo7 << l; l += 7; } while (c > 0x7f && cur < end); - CUDF_EXPECTS(uncompressed_size != 0 and cur < end, "Destination buffer too small"); + CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties"); } - return uncompressed_size; + comp_data = raw; + comp_len = src.size(); + if (compression != compression_type::AUTO) break; + [[fallthrough]]; } - default: return 0; + default: CUDF_FAIL("Unsupported compressed stream type"); + } + + return source_properties{compression, comp_data, comp_len, uncomp_len}; +} + +size_t estimate_uncompressed_size(compression_type compression, host_span src) +{ + auto srcprops = get_source_properties(compression, src); + return srcprops.uncomp_len; +} + +size_t decompress(compression_type compression, + host_span src, + host_span dst, + rmm::cuda_stream_view stream) +{ + switch (compression) { + case compression_type::GZIP: return decompress_gzip(src, dst); + case compression_type::ZLIB: return decompress_zlib(src, dst); + case compression_type::SNAPPY: return decompress_snappy(src, dst, stream); + case compression_type::ZSTD: return decompress_zstd(src, dst, stream); + default: CUDF_FAIL("Unsupported compression type"); + } +} + +std::vector decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto srcprops = get_source_properties(compression, src); + CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0, + "Unsupported compressed stream type"); + + if (srcprops.uncomp_len <= 0) { + srcprops.uncomp_len = + srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size + } + + if (compression == compression_type::GZIP || compression == compression_type::ZIP) { + // INFLATE + std::vector dst(srcprops.uncomp_len); + cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len); + return dst; } + if (compression == compression_type::BZIP2) { + size_t src_ofs = 0; + size_t dst_ofs = 0; + int bz_err = 0; + std::vector dst(srcprops.uncomp_len); + do { + size_t dst_len = srcprops.uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress( + srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + if (bz_err == BZ_OUTBUFF_FULL) { + // TBD: We could infer the compression ratio based on produced/consumed byte counts + // in order to minimize realloc events and over-allocation + dst_ofs = dst_len; + dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2); + dst.resize(dst_len); + srcprops.uncomp_len = dst_len; + } else if (bz_err == 0) { + srcprops.uncomp_len = dst_len; + dst.resize(srcprops.uncomp_len); + } + } while (bz_err == BZ_OUTBUFF_FULL); + CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); + return dst; + } + if (compression == compression_type::SNAPPY) { + std::vector dst(srcprops.uncomp_len); + decompress_snappy(src, dst, cudf::get_default_stream()); + return dst; + } + + CUDF_FAIL("Unsupported compressed stream type"); } } // namespace io diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 982190eecb5..7a1d5e1b9ea 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -48,6 +48,7 @@ constexpr int max_subchunks_prealloced = 3; * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source + * @param delimiter Delimiter character for JSONL inputs * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ @@ -56,6 +57,7 @@ device_span ingest_raw_input(device_span buffer, compression_type compression, size_t range_offset, size_t range_size, + char delimiter, rmm::cuda_stream_view stream); /** From 51a382eee159bb52584aa543de046d181b36947a Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 6 Nov 2024 12:07:53 +0000 Subject: [PATCH 11/14] update function name --- cpp/src/io/comp/uncomp.cpp | 2 +- cpp/src/io/json/read_json.cu | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 60d715fa5ce..443f9ae5cfd 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -580,7 +580,7 @@ source_properties get_source_properties(compression_type compression, host_span< return source_properties{compression, comp_data, comp_len, uncomp_len}; } -size_t estimate_uncompressed_size(compression_type compression, host_span src) +size_t get_uncompressed_size(compression_type compression, host_span src) { auto srcprops = get_source_properties(compression, src); return srcprops.uncomp_len; diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 8ce8117cddd..f6178b1e001 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -51,7 +51,7 @@ class compressed_host_buffer_source final : public datasource { _ch_buffer = std::vector(dbuf->data(), dbuf->data() + dbuf->size()); if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || comptype == compression_type::SNAPPY) { - _decompressed_ch_buffer_size = estimate_uncompressed_size(_comptype, _ch_buffer); + _decompressed_ch_buffer_size = get_uncompressed_size(_comptype, _ch_buffer); _decompressed_buffer.resize(0); } else { _decompressed_buffer = decompress(_comptype, _ch_buffer); From 9ae4c786fcbe9b49b1dce5a2ffd31e933dc192e6 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 6 Nov 2024 12:59:08 +0000 Subject: [PATCH 12/14] pr reviews --- cpp/src/io/comp/io_uncomp.hpp | 2 +- cpp/src/io/json/read_json.cu | 49 ++++++++++++++++++-------------- cpp/src/io/json/read_json.hpp | 1 - cpp/tests/io/json/json_utils.cuh | 1 - 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index d6ff2091c30..b6cf5502142 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -43,7 +43,7 @@ size_t decompress(compression_type compression, host_span dst, rmm::cuda_stream_view stream); -size_t estimate_uncompressed_size(compression_type compression, host_span src); +size_t get_uncompressed_size(compression_type compression, host_span src); /** * @brief GZIP header flags * See https://tools.ietf.org/html/rfc1952 diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 516ad911922..f3782d87ddf 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -53,7 +53,6 @@ class compressed_host_buffer_source final : public datasource { if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || comptype == compression_type::SNAPPY) { _decompressed_ch_buffer_size = get_uncompressed_size(_comptype, _ch_buffer); - _decompressed_buffer.resize(0); } else { _decompressed_buffer = decompress(_comptype, _ch_buffer); _decompressed_ch_buffer_size = _decompressed_buffer.size(); @@ -62,9 +61,18 @@ class compressed_host_buffer_source final : public datasource { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); - auto const count = std::min(size, decompressed_hbuf.size() - offset); - std::memcpy(dst, decompressed_hbuf.data() + offset, count); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) { + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + std::memcpy(dst, _decompressed_buffer.data() + offset, count); return count; } @@ -178,13 +186,12 @@ datasource::owning_buffer get_record_range_raw_input( { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - compression_type const reader_compression = reader_opts.get_compression(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -203,8 +210,8 @@ datasource::owning_buffer get_record_range_raw_input( // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; - auto readbufspan = ingest_raw_input( - bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream); + auto readbufspan = + ingest_raw_input(bufspan, sources, chunk_offset, chunk_size, delimiter, stream); auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = @@ -225,7 +232,6 @@ datasource::owning_buffer get_record_range_raw_input( buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), sources, - reader_compression, next_subchunk_start, size_per_subchunk, delimiter, @@ -301,10 +307,10 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } -table_with_metadata create_batched_cudf_table(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +table_with_metadata read_json_impl(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { /* * The batched JSON reader enforces that the size of each batch is at most INT_MAX @@ -409,7 +415,6 @@ table_with_metadata create_batched_cudf_table(host_span ingest_raw_input(device_span buffer, host_span> sources, - compression_type compression, std::size_t range_offset, std::size_t range_size, char delimiter, @@ -491,16 +496,16 @@ table_with_metadata read_json(host_span> sources, } if (reader_opts.get_compression() == compression_type::NONE) - return create_batched_cudf_table(sources, reader_opts, stream, mr); + return read_json_impl(sources, reader_opts, stream, mr); std::vector> compressed_sources; for (size_t i = 0; i < sources.size(); i++) { compressed_sources.emplace_back(std::make_unique( std::move(sources[i]), reader_opts.get_compression())); } - // in create_batched_cudf_table, we need the compressed source size to actually be the + // in read_json_impl, we need the compressed source size to actually be the // uncompressed source size for correct batching - return create_batched_cudf_table(compressed_sources, reader_opts, stream, mr); + return read_json_impl(compressed_sources, reader_opts, stream, mr); } } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 7a1d5e1b9ea..82333fdbdc0 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -54,7 +54,6 @@ constexpr int max_subchunks_prealloced = 3; */ device_span ingest_raw_input(device_span buffer, host_span> sources, - compression_type compression, size_t range_offset, size_t range_size, char delimiter, diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh index c31bb2d24e0..d2831b4f8d3 100644 --- a/cpp/tests/io/json/json_utils.cuh +++ b/cpp/tests/io/json/json_utils.cuh @@ -49,7 +49,6 @@ std::vector split_byte_range_reading( rmm::device_uvector buffer(total_source_size, stream); auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, sources, - reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), reader_opts.get_delimiter(), From a6faf3df1c1ca7c6e5fa53517186ea58213081a5 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 6 Nov 2024 14:07:35 +0000 Subject: [PATCH 13/14] storing compressed buf as datasource buf --- cpp/src/io/json/read_json.cu | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index f3782d87ddf..40a82fe490e 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -45,24 +45,27 @@ namespace { class compressed_host_buffer_source final : public datasource { public: - explicit compressed_host_buffer_source(std::unique_ptr src, compression_type comptype) - : _comptype{comptype} + explicit compressed_host_buffer_source(std::unique_ptr const& src, + compression_type comptype) + : _comptype{comptype}, _dbuf_ptr{src->host_read(0, src->size())} { - auto dbuf = src->host_read(0, src->size()); - _ch_buffer = std::vector(dbuf->data(), dbuf->data() + dbuf->size()); + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || comptype == compression_type::SNAPPY) { - _decompressed_ch_buffer_size = get_uncompressed_size(_comptype, _ch_buffer); + _decompressed_ch_buffer_size = get_uncompressed_size(_comptype, ch_buffer); } else { - _decompressed_buffer = decompress(_comptype, _ch_buffer); + _decompressed_buffer = decompress(_comptype, ch_buffer); _decompressed_ch_buffer_size = _decompressed_buffer.size(); } } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); if (_decompressed_buffer.empty()) { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto decompressed_hbuf = decompress(_comptype, ch_buffer); auto const count = std::min(size, decompressed_hbuf.size() - offset); bool partial_read = offset + count < decompressed_hbuf.size(); if (!partial_read) { @@ -78,8 +81,10 @@ class compressed_host_buffer_source final : public datasource { std::unique_ptr host_read(size_t offset, size_t size) override { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); if (_decompressed_buffer.empty()) { - auto decompressed_hbuf = decompress(_comptype, _ch_buffer); + auto decompressed_hbuf = decompress(_comptype, ch_buffer); auto const count = std::min(size, decompressed_hbuf.size() - offset); bool partial_read = offset + count < decompressed_hbuf.size(); if (!partial_read) @@ -96,7 +101,7 @@ class compressed_host_buffer_source final : public datasource { [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } private: - std::vector _ch_buffer; + std::unique_ptr _dbuf_ptr; compression_type _comptype; size_t _decompressed_ch_buffer_size; std::vector _decompressed_buffer; @@ -500,8 +505,8 @@ table_with_metadata read_json(host_span> sources, std::vector> compressed_sources; for (size_t i = 0; i < sources.size(); i++) { - compressed_sources.emplace_back(std::make_unique( - std::move(sources[i]), reader_opts.get_compression())); + compressed_sources.emplace_back( + std::make_unique(sources[i], reader_opts.get_compression())); } // in read_json_impl, we need the compressed source size to actually be the // uncompressed source size for correct batching From 92facb5460e0f6b3b588aeb3609a66f7d8351558 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 8 Nov 2024 15:23:54 +0000 Subject: [PATCH 14/14] pr reviews --- cpp/src/io/comp/uncomp.cpp | 50 +++-------------------------------- cpp/src/io/json/read_json.hpp | 8 +++--- 2 files changed, 6 insertions(+), 52 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 443f9ae5cfd..1a5e5d3d3b3 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -302,7 +302,7 @@ size_t decompress_gzip(host_span src, host_span dst) /** * @brief SNAPPY host decompressor */ -size_t host_decompress_snappy(host_span src, host_span dst) +size_t decompress_snappy(host_span src, host_span dst) { CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs"); uint32_t uncompressed_size, bytes_left, dst_pos; @@ -383,50 +383,6 @@ size_t host_decompress_snappy(host_span src, host_span d return uncompressed_size; } -/** - * @brief SNAPPY device decompressor - */ -size_t decompress_snappy(host_span src, - host_span dst, - rmm::cuda_stream_view stream) -{ - // Init device span of spans (source) - auto const d_src = - cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); - auto hd_srcs = cudf::detail::hostdevice_vector>(1, stream); - hd_srcs[0] = d_src; - hd_srcs.host_to_device_async(stream); - - // Init device span of spans (temporary destination) - auto d_dst = rmm::device_uvector(dst.size(), stream); - auto hd_dsts = cudf::detail::hostdevice_vector>(1, stream); - hd_dsts[0] = d_dst; - hd_dsts.host_to_device_async(stream); - - auto hd_stats = cudf::detail::hostdevice_vector(1, stream); - hd_stats[0] = compression_result{0, compression_status::FAILURE}; - hd_stats.host_to_device_async(stream); - - auto const max_uncomp_page_size = dst.size(); - nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - hd_srcs, - hd_dsts, - hd_stats, - max_uncomp_page_size, - max_uncomp_page_size, - stream); - - hd_stats.device_to_host_sync(stream); - CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); - - // Copy temporary output to `dst` - cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); - - return hd_stats[0].bytes_written; -} - /** * @brief ZSTD decompressor that uses nvcomp */ @@ -594,7 +550,7 @@ size_t decompress(compression_type compression, switch (compression) { case compression_type::GZIP: return decompress_gzip(src, dst); case compression_type::ZLIB: return decompress_zlib(src, dst); - case compression_type::SNAPPY: return decompress_snappy(src, dst, stream); + case compression_type::SNAPPY: return decompress_snappy(src, dst); case compression_type::ZSTD: return decompress_zstd(src, dst, stream); default: CUDF_FAIL("Unsupported compression type"); } @@ -647,7 +603,7 @@ std::vector decompress(compression_type compression, host_span dst(srcprops.uncomp_len); - decompress_snappy(src, dst, cudf::get_default_stream()); + decompress_snappy(src, dst); return dst; } diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 82333fdbdc0..ac980938522 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -32,10 +32,9 @@ namespace CUDF_EXPORT cudf { namespace io::json::detail { // Some magic numbers -constexpr int num_subchunks = 10; // per chunk_size -constexpr size_t min_subchunk_size = 10000; -constexpr int estimated_compression_ratio = 4; -constexpr int max_subchunks_prealloced = 3; +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int max_subchunks_prealloced = 3; /** * @brief Read from array of data sources into RMM buffer. The size of the returned device span @@ -45,7 +44,6 @@ constexpr int max_subchunks_prealloced = 3; * * @param buffer Device span buffer to which data is read * @param sources Array of data sources - * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source * @param delimiter Delimiter character for JSONL inputs