diff --git a/.github/workflows/auto-merge.yml b/.github/workflows/auto-merge.yml
index b1c3c2b32b..8a71a90f3b 100755
--- a/.github/workflows/auto-merge.yml
+++ b/.github/workflows/auto-merge.yml
@@ -18,12 +18,12 @@ name: auto-merge HEAD to BASE
on:
pull_request_target:
branches:
- - branch-24.06
+ - branch-24.08
types: [closed]
env:
- HEAD: branch-24.06
- BASE: branch-24.08
+ HEAD: branch-24.08
+ BASE: branch-24.10
jobs:
auto-merge:
diff --git a/.github/workflows/blossom-ci.yml b/.github/workflows/blossom-ci.yml
index 33ccf50ea8..a17fbddd83 100644
--- a/.github/workflows/blossom-ci.yml
+++ b/.github/workflows/blossom-ci.yml
@@ -33,40 +33,43 @@ jobs:
args: ${{ env.args }}
# This job only runs for pull request comments
- if: contains( '\
- abellina,\
- anfeng,\
- firestarman,\
- GaryShen2008,\
- jlowe,\
- mythrocks,\
- nartal1,\
- nvdbaranec,\
- NvTimLiu,\
- razajafri,\
- revans2,\
- rwlee,\
- sameerz,\
- tgravescs,\
- wbo4958,\
- wjxiz1992,\
- sperlingxx,\
- YanxuanLiu,\
- hyperbolic2346,\
- gerashegalov,\
- ttnghia,\
- nvliyuan,\
- res-life,\
- HaoYang670,\
- NVnavkumar,\
- yinqingh,\
- thirtiseven,\
- parthosa,\
- liurenjie1024,\
- binmahone,\
- pmattione-nvidia,\
- Feng-Jiang28,\
- ', format('{0},', github.actor)) && github.event.comment.body == 'build'
+ if: |
+ github.event.comment.body == 'build' &&
+ (
+ github.actor == 'abellina' ||
+ github.actor == 'anfeng' ||
+ github.actor == 'firestarman' ||
+ github.actor == 'GaryShen2008' ||
+ github.actor == 'jlowe' ||
+ github.actor == 'mythrocks' ||
+ github.actor == 'nartal1' ||
+ github.actor == 'nvdbaranec' ||
+ github.actor == 'NvTimLiu' ||
+ github.actor == 'razajafri' ||
+ github.actor == 'revans2' ||
+ github.actor == 'rwlee' ||
+ github.actor == 'sameerz' ||
+ github.actor == 'tgravescs' ||
+ github.actor == 'wbo4958' ||
+ github.actor == 'wjxiz1992' ||
+ github.actor == 'sperlingxx' ||
+ github.actor == 'YanxuanLiu' ||
+ github.actor == 'hyperbolic2346' ||
+ github.actor == 'gerashegalov' ||
+ github.actor == 'ttnghia' ||
+ github.actor == 'nvliyuan' ||
+ github.actor == 'res-life' ||
+ github.actor == 'HaoYang670' ||
+ github.actor == 'NVnavkumar' ||
+ github.actor == 'yinqingh' ||
+ github.actor == 'thirtiseven' ||
+ github.actor == 'parthosa' ||
+ github.actor == 'liurenjie1024' ||
+ github.actor == 'binmahone' ||
+ github.actor == 'pmattione-nvidia' ||
+ github.actor == 'Feng-Jiang28' ||
+ github.actor == 'pxLi'
+ )
steps:
- name: Check if comment is issued by authorized person
run: blossom-ci
@@ -81,7 +84,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
with:
repository: ${{ fromJson(needs.Authorization.outputs.args).repo }}
ref: ${{ fromJson(needs.Authorization.outputs.args).ref }}
@@ -89,7 +92,7 @@ jobs:
# repo specific steps
- name: Setup java
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: adopt
java-version: 8
diff --git a/.gitmodules b/.gitmodules
index 12b07c5b18..862e1ef3e6 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
[submodule "thirdparty/cudf"]
path = thirdparty/cudf
url = https://github.com/rapidsai/cudf.git
- branch = branch-24.06
+ branch = branch-24.08
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 1ada0b474b..7f83e2169b 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -61,15 +61,21 @@ the Docker container.
The script passes all of its arguments onto the Maven command run inside the Docker container,
so it should be invoked as one would invoke Maven, e.g.: `build/build-in-docker clean package`
+#### Using spark-rapids-jni Docker Container with other Repos
+
+Spark RAPIDS project spans multiple repos. Some issues are discovered in
+spark-rapids-jni but they need to be made easily reproducible in the cudf repo
+
+To this end export WORKDIR with the path pointing to a different repo
+
+```
+export WORKDIR=~/gits/rapidsai/cudf
+~/gits/NVIDIA/spark-rapids-jni/build/run-in-docker head README.md
+```
+
### cudf Submodule and Build
[RAPIDS cuDF](https://github.com/rapidsai/cudf) is being used as a submodule in this project.
-Due to the lengthy build of libcudf, it is **not cleaned** during a normal Maven clean phase
-unless built using `build/build-in-docker`. `build/build-in-docker` uses `ccache` by default
-unless CCACHE_DISABLE=1 is set in the environment.
-
-`-Dlibcudf.clean.skip=false` can also be specified on the Maven command-line to force
-libcudf to be cleaned during the Maven clean phase.
Currently libcudf is only configured once and the build relies on cmake to re-configure as needed.
This is because libcudf currently is rebuilding almost entirely when it is configured with the same
@@ -93,7 +99,6 @@ to control aspects of the build:
| `BUILD_BENCHMARKS` | Compile benchmarks | OFF |
| `BUILD_FAULTINJ` | Compile fault injection | ON |
| `libcudf.build.configure` | Force libcudf build to configure | false |
-| `libcudf.clean.skip` | Whether to skip cleaning libcudf build | true |
| `submodule.check.skip` | Whether to skip checking git submodules | false |
@@ -160,7 +165,7 @@ $ ./build/build-in-docker install ...
```
Now cd to ~/repos/NVIDIA/spark-rapids and build with one of the options from
-[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-24.06/CONTRIBUTING.md#building-from-source).
+[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/CONTRIBUTING.md#building-from-source).
```bash
$ ./build/buildall
diff --git a/build-libcudf.xml b/build-libcudf.xml
deleted file mode 100644
index 765f50e8a0..0000000000
--- a/build-libcudf.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-
-
-
-
- Configures and builds the libcudf library from the cudf submodule.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/build/run-in-docker b/build/run-in-docker
index 81152a1d9d..912feaa03c 100755
--- a/build/run-in-docker
+++ b/build/run-in-docker
@@ -20,9 +20,11 @@
set -e
-# Base paths relative to this script's location
-SCRIPTDIR=$(cd $(dirname $0); pwd)
-REPODIR=$SCRIPTDIR/..
+REPODIR_REL=$(git rev-parse --show-toplevel)
+REPODIR=$(realpath "$REPODIR_REL")
+GIT_COMMON_DIR_REL=$(git rev-parse --git-common-dir)
+GIT_COMMON_DIR=$(realpath "$GIT_COMMON_DIR_REL")
+WORKDIR=${WORKDIR:-$REPODIR}
CUDA_VERSION=${CUDA_VERSION:-11.8.0}
DOCKER_CMD=${DOCKER_CMD:-docker}
@@ -63,11 +65,11 @@ $DOCKER_CMD run $DOCKER_GPU_OPTS $DOCKER_RUN_EXTRA_ARGS -u $(id -u):$(id -g) --r
-v "/etc/passwd:/etc/passwd:ro" \
-v "/etc/shadow:/etc/shadow:ro" \
-v "/etc/sudoers.d:/etc/sudoers.d:ro" \
- -v "$REPODIR:$REPODIR:rw" \
+ -v "$GIT_COMMON_DIR:$GIT_COMMON_DIR:rw" \
+ -v "$WORKDIR:$WORKDIR:rw" \
-v "$LOCAL_CCACHE_DIR:$LOCAL_CCACHE_DIR:rw" \
-v "$LOCAL_MAVEN_REPO:$LOCAL_MAVEN_REPO:rw" \
- --workdir "$REPODIR" \
- -e CCACHE_DISABLE \
+ --workdir "$WORKDIR" \
-e CCACHE_DIR="$LOCAL_CCACHE_DIR" \
-e CMAKE_C_COMPILER_LAUNCHER="ccache" \
-e CMAKE_CXX_COMPILER_LAUNCHER="ccache" \
diff --git a/ci/Jenkinsfile.premerge b/ci/Jenkinsfile.premerge
index 0a00eb6f1b..fb7c3cd0de 100644
--- a/ci/Jenkinsfile.premerge
+++ b/ci/Jenkinsfile.premerge
@@ -57,7 +57,8 @@ pipeline {
parameters {
string(name: 'PARALLEL_LEVEL', defaultValue: '18',
description: 'Parallel build cudf cpp with -DCPP_PARALLEL_LEVEL')
- string(name: 'REF', defaultValue: '',
+ // Put a default value for REF to avoid error when running the pipeline manually
+ string(name: 'REF', defaultValue: 'main',
description: 'Merged commit of specific PR')
string(name: 'GITHUB_DATA', defaultValue: '',
description: 'Json-formatted github data from upstream blossom-ci')
diff --git a/ci/check-cuda-dependencies.sh b/ci/check-cuda-dependencies.sh
new file mode 100644
index 0000000000..9d988bedae
--- /dev/null
+++ b/ci/check-cuda-dependencies.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+#
+# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# common script to help check if packaged *.so files have dynamical link to CUDA Runtime
+
+set -exo pipefail
+
+jar_path=$1
+tmp_path=/tmp/"jni-$(date "+%Y%m%d%H%M%S")"
+unzip -j "${jar_path}" "*64/Linux/*.so" -d "${tmp_path}"
+
+find "$tmp_path" -type f -name "*.so" | while read -r so_file; do
+ # Check if *.so file has a dynamic link to CUDA Runtime
+ if objdump -p "$so_file" | grep NEEDED | grep -qi cudart; then
+ echo "Dynamic link to CUDA Runtime found in $so_file..."
+ ldd "$so_file"
+ exit 1
+ else
+ echo "No dynamic link to CUDA Runtime found in $so_file"
+ fi
+done
diff --git a/ci/nightly-build.sh b/ci/nightly-build.sh
index 8a3c2dbacf..267b26efdf 100755
--- a/ci/nightly-build.sh
+++ b/ci/nightly-build.sh
@@ -1,6 +1,6 @@
#!/bin/bash
#
-# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
+# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ USE_GDS=${USE_GDS:-ON}
USE_SANITIZER=${USE_SANITIZER:-ON}
BUILD_FAULTINJ=${BUILD_FAULTINJ:-ON}
ARM64=${ARM64:-false}
+artifact_suffix="${CUDA_VER}"
profiles="source-javadoc"
if [ "${ARM64}" == "true" ]; then
@@ -36,6 +37,7 @@ if [ "${ARM64}" == "true" ]; then
USE_GDS="OFF"
USE_SANITIZER="ON"
BUILD_FAULTINJ="OFF"
+ artifact_suffix="${artifact_suffix}-arm64"
fi
${MVN} clean package ${MVN_MIRROR} \
@@ -43,5 +45,8 @@ ${MVN} clean package ${MVN_MIRROR} \
-DCPP_PARALLEL_LEVEL=${PARALLEL_LEVEL} \
-Dlibcudf.build.configure=true \
-DUSE_GDS=${USE_GDS} -Dtest=*,!CuFileTest,!CudaFatalTest,!ColumnViewNonEmptyNullsTest \
- -DBUILD_TESTS=ON -DBUILD_FAULTINJ=${BUILD_FAULTINJ} -Dcuda.version=$CUDA_VER \
+ -DBUILD_TESTS=ON -DBUILD_BENCHMARKS=ON -DBUILD_FAULTINJ=${BUILD_FAULTINJ} -Dcuda.version=$CUDA_VER \
-DUSE_SANITIZER=${USE_SANITIZER}
+
+build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout)
+. ci/check-cuda-dependencies.sh "target/${build_name}-${artifact_suffix}.jar"
diff --git a/ci/premerge-build.sh b/ci/premerge-build.sh
index e3adc10b3e..7297a9ecdc 100755
--- a/ci/premerge-build.sh
+++ b/ci/premerge-build.sh
@@ -1,6 +1,6 @@
#!/bin/bash
#
-# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
+# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -27,4 +27,8 @@ ${MVN} verify ${MVN_MIRROR} \
-DCPP_PARALLEL_LEVEL=${PARALLEL_LEVEL} \
-Dlibcudf.build.configure=true \
-DUSE_GDS=ON -Dtest=*,!CuFileTest,!CudaFatalTest,!ColumnViewNonEmptyNullsTest \
- -DBUILD_TESTS=ON
+ -DBUILD_TESTS=ON -DBUILD_BENCHMARKS=ON
+
+build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout)
+cuda_version=$(${MVN} help:evaluate -Dexpression=cuda.version -q -DforceStdout)
+. ci/check-cuda-dependencies.sh "target/${build_name}-${cuda_version}.jar"
diff --git a/ci/submodule-sync.sh b/ci/submodule-sync.sh
index 1888696ba5..a889d86eb0 100755
--- a/ci/submodule-sync.sh
+++ b/ci/submodule-sync.sh
@@ -88,8 +88,13 @@ else
echo "Test failed, will update the result"
fi
+build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout)
+cuda_version=$(${MVN} help:evaluate -Dexpression=cuda.version -q -DforceStdout)
+. ci/check-cuda-dependencies.sh "target/${build_name}-${cuda_version}.jar"
+
+LIBCUDF_BUILD_PATH=$(${MVN} help:evaluate -Dexpression=libcudf.build.path -q -DforceStdout)
# Extract the rapids-cmake sha1 that we need to pin too
-rapids_cmake_sha=$(git -C thirdparty/cudf/cpp/build/_deps/rapids-cmake-src/ rev-parse HEAD)
+rapids_cmake_sha=$(git -C ${LIBCUDF_BUILD_PATH}/_deps/rapids-cmake-src/ rev-parse HEAD)
echo "Update rapids-cmake pinned SHA1 to ${rapids_cmake_sha}"
echo "${rapids_cmake_sha}" > thirdparty/cudf-pins/rapids-cmake.sha
diff --git a/pom.xml b/pom.xml
index 566c06b934..70920a5316 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
com.nvidia
spark-rapids-jni
- 24.06.0
+ 24.08.0
jar
RAPIDS Accelerator JNI for Apache Spark
@@ -81,8 +81,11 @@
OFF
OFF
+ Release
OFF
OFF
+ OFF
+ OFF
ON
ON
false
@@ -93,16 +96,16 @@
${project.basedir}/thirdparty/cudf-pins/
3.2.4
5.8.1
- ${cudf.path}/cpp/build
+ ${project.build.directory}/libcudf/cmake-build/
false
- true
+ false
${project.build.directory}/libcudf-install
pinned
${project.build.directory}/libcudfjni
1.8
1.8
2.25.0
- ${project.build.directory}/cmake-build
+ ${project.build.directory}/jni/cmake-build
1.10.0
UTF-8
1.7.30
@@ -322,11 +325,12 @@
failonerror="true"
executable="cmake">
-
+
+ ${skipTests}
run
@@ -392,18 +396,49 @@
build-libcudf
validate
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -429,6 +464,8 @@
+
+
+
+
+
+
@@ -461,6 +502,8 @@
+
+
#include
-#include
+#include
#include
static void bloom_filter_put(nvbench::state& state)
diff --git a/src/main/cpp/faultinj/faultinj.cu b/src/main/cpp/faultinj/faultinj.cu
index 13065a81ed..fcb4b3a12d 100644
--- a/src/main/cpp/faultinj/faultinj.cu
+++ b/src/main/cpp/faultinj/faultinj.cu
@@ -136,12 +136,12 @@ CUptiResult cuptiInitialize(void)
return status;
}
-__global__ void faultInjectorKernelAssert(void)
+__global__ static void faultInjectorKernelAssert(void)
{
assert(0 && "faultInjectorKernelAssert triggered");
}
-__global__ void faultInjectorKernelTrap(void) { asm("trap;"); }
+__global__ static void faultInjectorKernelTrap(void) { asm("trap;"); }
boost::optional lookupConfig(
boost::optional domainConfigs,
diff --git a/src/main/cpp/profiler/ProfilerJni.cpp b/src/main/cpp/profiler/ProfilerJni.cpp
index 1271b89d7b..87e01b2e92 100644
--- a/src/main/cpp/profiler/ProfilerJni.cpp
+++ b/src/main/cpp/profiler/ProfilerJni.cpp
@@ -184,7 +184,8 @@ struct free_buffer_tracker {
void writer_thread_process(JavaVM* vm,
jobject j_writer,
size_t buffer_size,
- size_t flush_threshold);
+ size_t flush_threshold,
+ bool async_alloc_capture);
struct subscriber_state {
CUpti_SubscriberHandle subscriber_handle;
@@ -363,11 +364,16 @@ void setup_nvtx_env(JNIEnv* env, jstring j_lib_path)
}
// Main processing loop for the background writer thread
-void writer_thread_process(JavaVM* vm, jobject j_writer, size_t buffer_size, size_t flush_threshold)
+void writer_thread_process(JavaVM* vm,
+ jobject j_writer,
+ size_t buffer_size,
+ size_t flush_threshold,
+ bool async_alloc_capture)
{
try {
JNIEnv* env = attach_to_jvm(vm);
- profiler_serializer serializer(env, j_writer, buffer_size, flush_threshold);
+ profiler_serializer serializer(
+ env, j_writer, buffer_size, flush_threshold, async_alloc_capture);
auto buffer = State->completed_buffers.get();
while (buffer) {
serializer.process_cupti_buffer(buffer->data(), buffer->valid_size());
@@ -419,12 +425,14 @@ extern "C" {
using namespace spark_rapids_jni::profiler;
-JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIEnv* env,
- jclass,
- jstring j_lib_path,
- jobject j_writer,
- jlong write_buffer_size,
- jint flush_period_msec)
+JNIEXPORT void JNICALL
+Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIEnv* env,
+ jclass,
+ jstring j_lib_path,
+ jobject j_writer,
+ jlong write_buffer_size,
+ jint flush_period_msec,
+ bool async_alloc_capture)
{
try {
setup_nvtx_env(env, j_lib_path);
@@ -432,9 +440,13 @@ JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIE
auto writer = static_cast(env->NewGlobalRef(j_writer));
if (!writer) { throw std::runtime_error("Unable to create a global reference to writer"); }
State = new subscriber_state(writer, write_buffer_size);
- State->writer_thread = std::thread(
- writer_thread_process, get_jvm(env), writer, write_buffer_size, write_buffer_size);
- auto rc = cuptiSubscribe(&State->subscriber_handle, callback_handler, nullptr);
+ State->writer_thread = std::thread(writer_thread_process,
+ get_jvm(env),
+ writer,
+ write_buffer_size,
+ write_buffer_size,
+ async_alloc_capture);
+ auto rc = cuptiSubscribe(&State->subscriber_handle, callback_handler, nullptr);
check_cupti(rc, "Error initializing CUPTI");
rc = cuptiEnableCallback(1,
State->subscriber_handle,
diff --git a/src/main/cpp/profiler/profiler_serializer.cpp b/src/main/cpp/profiler/profiler_serializer.cpp
index b47ff234ad..84729c9dd9 100644
--- a/src/main/cpp/profiler/profiler_serializer.cpp
+++ b/src/main/cpp/profiler/profiler_serializer.cpp
@@ -197,11 +197,13 @@ ShmemLimitConfig to_shmem_limit_config(CUpti_FuncShmemLimitConfig c)
} // anonymous namespace
-profiler_serializer::profiler_serializer(JNIEnv* env,
- jobject writer,
- size_t buffer_size,
- size_t flush_threshold)
- : env_(env), j_writer_(writer), flush_threshold_(flush_threshold), fbb_(buffer_size)
+profiler_serializer::profiler_serializer(
+ JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold, bool capture_allocs)
+ : env_(env),
+ j_writer_(writer),
+ flush_threshold_(flush_threshold),
+ fbb_(buffer_size),
+ capture_allocs_(capture_allocs)
{
auto writer_class = env->GetObjectClass(writer);
if (!writer_class) { throw std::runtime_error("Failed to locate class of data writer"); }
@@ -322,6 +324,10 @@ void profiler_serializer::flush()
void profiler_serializer::process_api_activity(CUpti_ActivityAPI const* r)
{
+ if (r->start == 0 || r->end == 0) {
+ // Ignore records with bad timestamps
+ return;
+ }
auto api_kind = ApiKind_Runtime;
if (r->kind == CUPTI_ACTIVITY_KIND_DRIVER) {
api_kind = ApiKind_Driver;
@@ -332,6 +338,14 @@ void profiler_serializer::process_api_activity(CUpti_ActivityAPI const* r)
case CUPTI_RUNTIME_TRACE_CBID_cudaGetLastError_v3020:
case CUPTI_RUNTIME_TRACE_CBID_cudaPeekAtLastError_v3020:
case CUPTI_RUNTIME_TRACE_CBID_cudaDeviceGetAttribute_v5000: return;
+ case CUPTI_RUNTIME_TRACE_CBID_cudaMallocAsync_v11020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaMallocAsync_ptsz_v11020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaMallocFromPoolAsync_v11020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaMallocFromPoolAsync_ptsz_v11020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaFreeAsync_v11020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaFreeAsync_ptsz_v11020:
+ if (capture_allocs_) { break; }
+ return;
default: break;
}
} else {
@@ -393,6 +407,10 @@ void profiler_serializer::process_dropped_records(size_t num_dropped)
void profiler_serializer::process_kernel(CUpti_ActivityKernel8 const* r)
{
+ if (r->start == 0 || r->end == 0) {
+ // Ignore records with invalid timestamps
+ return;
+ }
auto name = fbb_.CreateSharedString(r->name);
KernelActivityBuilder kab(fbb_);
kab.add_requested(r->cacheConfig.config.requested);
@@ -443,6 +461,10 @@ void profiler_serializer::process_kernel(CUpti_ActivityKernel8 const* r)
void profiler_serializer::process_marker_activity(CUpti_ActivityMarker2 const* r)
{
+ if (r->timestamp == 0) {
+ // Ignore records with invalid timestamps
+ return;
+ }
auto object_id = add_object_id(fbb_, r->objectKind, r->objectId);
auto has_name = r->name != nullptr;
auto has_domain = r->name != nullptr;
@@ -462,6 +484,10 @@ void profiler_serializer::process_marker_activity(CUpti_ActivityMarker2 const* r
void profiler_serializer::process_marker_data(CUpti_ActivityMarkerData const* r)
{
+ if (r->flags == 0 && r->color == 0 && r->category == 0) {
+ // Ignore uninteresting marker data records
+ return;
+ }
MarkerDataBuilder mdb(fbb_);
mdb.add_flags(marker_flags_to_fb(r->flags));
mdb.add_id(r->id);
@@ -472,6 +498,10 @@ void profiler_serializer::process_marker_data(CUpti_ActivityMarkerData const* r)
void profiler_serializer::process_memcpy(CUpti_ActivityMemcpy5 const* r)
{
+ if (r->start == 0 || r->end == 0) {
+ // Ignore records with invalid timestamps
+ return;
+ }
MemcpyActivityBuilder mab(fbb_);
mab.add_copy_kind(to_memcpy_kind(r->copyKind));
mab.add_src_kind(to_memory_kind(r->srcKind));
@@ -494,6 +524,10 @@ void profiler_serializer::process_memcpy(CUpti_ActivityMemcpy5 const* r)
void profiler_serializer::process_memset(CUpti_ActivityMemset4 const* r)
{
+ if (r->start == 0 || r->end == 0) {
+ // Ignore records with invalid timestamps
+ return;
+ }
MemsetActivityBuilder mab(fbb_);
mab.add_value(r->value);
mab.add_bytes(r->bytes);
@@ -514,6 +548,10 @@ void profiler_serializer::process_memset(CUpti_ActivityMemset4 const* r)
void profiler_serializer::process_overhead(CUpti_ActivityOverhead const* r)
{
+ if (r->start == 0 || r->end == 0) {
+ // Ignore records with invalid timestamps
+ return;
+ }
auto object_id = add_object_id(fbb_, r->objectKind, r->objectId);
OverheadActivityBuilder oab(fbb_);
oab.add_overhead_kind(to_overhead_kind(r->overheadKind));
diff --git a/src/main/cpp/profiler/profiler_serializer.hpp b/src/main/cpp/profiler/profiler_serializer.hpp
index 1feebf1b96..861cf9d1ab 100644
--- a/src/main/cpp/profiler/profiler_serializer.hpp
+++ b/src/main/cpp/profiler/profiler_serializer.hpp
@@ -29,7 +29,8 @@ namespace spark_rapids_jni::profiler {
// Serializes profile data as flatbuffers
struct profiler_serializer {
- profiler_serializer(JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold);
+ profiler_serializer(
+ JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold, bool capture_allocs);
void process_cupti_buffer(uint8_t* buffer, size_t valid_size);
void flush();
@@ -51,6 +52,7 @@ struct profiler_serializer {
jmethodID j_write_method_;
jobject j_writer_;
size_t flush_threshold_;
+ bool capture_allocs_;
flatbuffers::FlatBufferBuilder fbb_;
std::vector> api_offsets_;
std::vector> device_offsets_;
diff --git a/src/main/cpp/src/CaseWhenJni.cpp b/src/main/cpp/src/CaseWhenJni.cpp
new file mode 100644
index 0000000000..2f99e85b4b
--- /dev/null
+++ b/src/main/cpp/src/CaseWhenJni.cpp
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "case_when.hpp"
+#include "cudf_jni_apis.hpp"
+
+extern "C" {
+
+JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_CaseWhen_selectFirstTrueIndex(
+ JNIEnv* env, jclass, jlongArray bool_cols)
+{
+ JNI_NULL_CHECK(env, bool_cols, "array of column handles is null", 0);
+ try {
+ cudf::jni::auto_set_device(env);
+ cudf::jni::native_jpointerArray n_cudf_bool_columns(env, bool_cols);
+ auto bool_column_views = n_cudf_bool_columns.get_dereferenced();
+ return cudf::jni::release_as_jlong(
+ spark_rapids_jni::select_first_true_index(cudf::table_view(bool_column_views)));
+ }
+ CATCH_STD(env, 0);
+}
+}
diff --git a/src/main/cpp/src/DecimalUtilsJni.cpp b/src/main/cpp/src/DecimalUtilsJni.cpp
index 6c7c1cc781..c63b84d92e 100644
--- a/src/main/cpp/src/DecimalUtilsJni.cpp
+++ b/src/main/cpp/src/DecimalUtilsJni.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -110,4 +110,24 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_DecimalUtils_subtr
CATCH_STD(env, 0);
}
+JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_DecimalUtils_floatingPointToDecimal(
+ JNIEnv* env, jclass, jlong j_input, jint output_type_id, jint precision, jint decimal_scale)
+{
+ JNI_NULL_CHECK(env, j_input, "j_input is null", 0);
+ try {
+ cudf::jni::auto_set_device(env);
+ auto const input = reinterpret_cast(j_input);
+ cudf::jni::native_jlongArray output(env, 2);
+
+ auto [casted_col, has_failure] = cudf::jni::floating_point_to_decimal(
+ *input,
+ cudf::data_type{static_cast(output_type_id), static_cast(decimal_scale)},
+ precision);
+ output[0] = cudf::jni::release_as_jlong(std::move(casted_col));
+ output[1] = static_cast(has_failure);
+ return output.get_jArray();
+ }
+ CATCH_STD(env, 0);
+}
+
} // extern "C"
diff --git a/src/main/cpp/src/HashJni.cpp b/src/main/cpp/src/HashJni.cpp
index 9e556cdd2d..c0adf38686 100644
--- a/src/main/cpp/src/HashJni.cpp
+++ b/src/main/cpp/src/HashJni.cpp
@@ -16,7 +16,7 @@
#include "cudf_jni_apis.hpp"
#include "dtype_utils.hpp"
-#include "hash.cuh"
+#include "hash.hpp"
#include "jni_utils.hpp"
extern "C" {
@@ -52,4 +52,19 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_xxhash64(JNIEnv* e
}
CATCH_STD(env, 0);
}
+
+JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_hiveHash(JNIEnv* env,
+ jclass,
+ jlongArray column_handles)
+{
+ JNI_NULL_CHECK(env, column_handles, "array of column handles is null", 0);
+
+ try {
+ cudf::jni::auto_set_device(env);
+ auto column_views =
+ cudf::jni::native_jpointerArray{env, column_handles}.get_dereferenced();
+ return cudf::jni::release_as_jlong(spark_rapids_jni::hive_hash(cudf::table_view{column_views}));
+ }
+ CATCH_STD(env, 0);
+}
}
diff --git a/src/main/cpp/src/JSONUtilsJni.cpp b/src/main/cpp/src/JSONUtilsJni.cpp
index 73b932d4b9..0da20f53f9 100644
--- a/src/main/cpp/src/JSONUtilsJni.cpp
+++ b/src/main/cpp/src/JSONUtilsJni.cpp
@@ -24,6 +24,7 @@
using path_instruction_type = spark_rapids_jni::path_instruction_type;
extern "C" {
+
JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObject(
JNIEnv* env, jclass, jlong input_column, jobjectArray path_instructions)
{
@@ -67,4 +68,66 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObject
}
CATCH_STD(env, 0);
}
+
+JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObjectMultiplePaths(
+ JNIEnv* env, jclass, jlong j_input, jobjectArray j_paths, jintArray j_path_offsets)
+{
+ JNI_NULL_CHECK(env, j_input, "j_input column is null", 0);
+ JNI_NULL_CHECK(env, j_paths, "j_paths is null", 0);
+ JNI_NULL_CHECK(env, j_path_offsets, "j_path_offsets is null", 0);
+
+ using path_type = std::vector>;
+
+ try {
+ cudf::jni::auto_set_device(env);
+
+ auto const path_offsets = cudf::jni::native_jintArray(env, j_path_offsets).to_vector();
+ CUDF_EXPECTS(path_offsets.size() > 1, "Invalid path offsets.");
+ auto const num_paths = path_offsets.size() - 1;
+ std::vector paths(num_paths);
+
+ for (std::size_t i = 0; i < num_paths; ++i) {
+ auto const path_size = path_offsets[i + 1] - path_offsets[i];
+ auto path = path_type{};
+ path.reserve(path_size);
+ for (int j = path_offsets[i]; j < path_offsets[i + 1]; ++j) {
+ jobject instruction = env->GetObjectArrayElement(j_paths, j);
+ JNI_NULL_CHECK(env, instruction, "path_instruction is null", 0);
+ jclass instruction_class = env->GetObjectClass(instruction);
+ JNI_NULL_CHECK(env, instruction_class, "instruction_class is null", 0);
+
+ jfieldID field_id = env->GetFieldID(instruction_class, "type", "I");
+ JNI_NULL_CHECK(env, field_id, "field_id is null", 0);
+ jint type = env->GetIntField(instruction, field_id);
+ path_instruction_type instruction_type = static_cast(type);
+
+ field_id = env->GetFieldID(instruction_class, "name", "Ljava/lang/String;");
+ JNI_NULL_CHECK(env, field_id, "field_id is null", 0);
+ jstring name = (jstring)env->GetObjectField(instruction, field_id);
+ JNI_NULL_CHECK(env, name, "name is null", 0);
+ const char* name_str = env->GetStringUTFChars(name, JNI_FALSE);
+
+ field_id = env->GetFieldID(instruction_class, "index", "J");
+ JNI_NULL_CHECK(env, field_id, "field_id is null", 0);
+ jlong index = env->GetLongField(instruction, field_id);
+
+ path.emplace_back(instruction_type, name_str, index);
+ env->ReleaseStringUTFChars(name, name_str);
+ }
+
+ paths[i] = std::move(path);
+ }
+
+ auto const input_cv = reinterpret_cast(j_input);
+ auto output =
+ spark_rapids_jni::get_json_object_multiple_paths(cudf::strings_column_view{*input_cv}, paths);
+
+ auto out_handles = cudf::jni::native_jlongArray(env, output.size());
+ std::transform(output.begin(), output.end(), out_handles.begin(), [](auto& col) {
+ return cudf::jni::release_as_jlong(col);
+ });
+ return out_handles.get_jArray();
+ }
+ CATCH_STD(env, 0);
+}
}
diff --git a/src/main/cpp/src/SubStringIndexJni.cpp b/src/main/cpp/src/SubStringIndexJni.cpp
new file mode 100644
index 0000000000..1e53166ab7
--- /dev/null
+++ b/src/main/cpp/src/SubStringIndexJni.cpp
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cudf_jni_apis.hpp"
+#include "substring_index.hpp"
+
+extern "C" {
+
+JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_GpuSubstringIndexUtils_substringIndex(
+ JNIEnv* env, jclass, jlong strings_handle, jlong delimiter, jint count)
+{
+ JNI_NULL_CHECK(env, strings_handle, "strings column handle is null", 0);
+ JNI_NULL_CHECK(env, delimiter, "delimiter scalar handle is null", 0);
+ try {
+ cudf::jni::auto_set_device(env);
+ auto const input = reinterpret_cast(strings_handle);
+ auto const strings_column = cudf::strings_column_view{*input};
+ cudf::string_scalar* ss_scalar = reinterpret_cast(delimiter);
+ return cudf::jni::release_as_jlong(
+ spark_rapids_jni::substring_index(strings_column, *ss_scalar, count));
+ }
+ CATCH_STD(env, 0);
+}
+} // extern "C"
diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu
index 5dfdd582ef..da4e3c5cb9 100644
--- a/src/main/cpp/src/bloom_filter.cu
+++ b/src/main/cpp/src/bloom_filter.cu
@@ -60,10 +60,10 @@ __device__ inline std::pair gpu_get_hash_ma
}
template
-__global__ void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter,
- cudf::size_type bloom_filter_bits,
- cudf::column_device_view input,
- cudf::size_type num_hashes)
+CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter,
+ cudf::size_type bloom_filter_bits,
+ cudf::column_device_view input,
+ cudf::size_type num_hashes)
{
size_t const tid = threadIdx.x + blockIdx.x * blockDim.x;
if (tid >= input.size()) { return; }
diff --git a/src/main/cpp/src/case_when.cu b/src/main/cpp/src/case_when.cu
new file mode 100644
index 0000000000..9857403898
--- /dev/null
+++ b/src/main/cpp/src/case_when.cu
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "case_when.hpp"
+
+#include
+#include
+#include
+#include
+
+#include
+
+namespace spark_rapids_jni {
+namespace detail {
+namespace {
+
+/**
+ * Select the column index for the first true in bool columns for the specified row
+ */
+struct select_first_true_fn {
+ // bool columns stores the results of executing `when` expressions
+ cudf::table_device_view const d_table;
+
+ /**
+ * The number of bool columns is the size of case when branches.
+ * Note: reuturned index may be out of bound, valid bound is [0, col_num)
+ * When returning col_num index, it means final result is NULL value or ELSE value.
+ *
+ * e.g.:
+ * CASE WHEN 'a' THEN 'A' END
+ * The number of bool columns is 1
+ * The number of scalars is 1
+ * Max index is 1 which means using NULL(all when exprs are false).
+ * CASE WHEN 'a' THEN 'A' ELSE '_' END
+ * The number of bool columns is 1
+ * The number of scalars is 2
+ * Max index is also 1 which means using else value '_'
+ */
+ __device__ cudf::size_type operator()(std::size_t row_idx) const
+ {
+ auto col_num = d_table.num_columns();
+ for (auto col_idx = 0; col_idx < col_num; col_idx++) {
+ auto const& col = d_table.column(col_idx);
+ if (!col.is_null(row_idx) && col.element(row_idx)) {
+ // Predicate is true and not null
+ return col_idx;
+ }
+ }
+ return col_num;
+ }
+};
+
+} // anonymous namespace
+
+std::unique_ptr select_first_true_index(cudf::table_view const& when_bool_columns,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr)
+{
+ // checks
+ auto const num_columns = when_bool_columns.num_columns();
+ CUDF_EXPECTS(num_columns > 0, "At least one column must be specified");
+ auto const row_count = when_bool_columns.num_rows();
+ if (row_count == 0) { // empty begets empty
+ return cudf::make_empty_column(cudf::type_id::INT32);
+ }
+ // make output column
+ auto ret = cudf::make_numeric_column(
+ cudf::data_type{cudf::type_id::INT32}, row_count, cudf::mask_state::UNALLOCATED, stream, mr);
+
+ // select first true index
+ auto const d_table_ptr = cudf::table_device_view::create(when_bool_columns, stream);
+ thrust::transform(rmm::exec_policy(stream),
+ thrust::make_counting_iterator(0),
+ thrust::make_counting_iterator(row_count),
+ ret->mutable_view().begin(),
+ select_first_true_fn{*d_table_ptr});
+ return ret;
+}
+
+} // namespace detail
+
+std::unique_ptr select_first_true_index(cudf::table_view const& when_bool_columns,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr)
+{
+ return detail::select_first_true_index(when_bool_columns, stream, mr);
+}
+
+} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/case_when.hpp b/src/main/cpp/src/case_when.hpp
new file mode 100644
index 0000000000..b7056c6c8f
--- /dev/null
+++ b/src/main/cpp/src/case_when.hpp
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+
+#include
+#include
+
+#include
+
+namespace spark_rapids_jni {
+
+/**
+ *
+ * Select the column index for the first true in bool columns.
+ * For the row does not contain true, use end index(number of columns).
+ *
+ * e.g.:
+ * column 0 in table: true, false, false, false
+ * column 1 in table: false, true, false, false
+ * column 2 in table: false, false, true, false
+ *
+ * 1st row is: true, flase, false; first true index is 0
+ * 2nd row is: false, true, false; first true index is 1
+ * 3rd row is: false, flase, true; first true index is 2
+ * 4th row is: false, false, false; do not find true, set index to the end index 3
+ *
+ * output column: 0, 1, 2, 3
+ * In the `case when` context, here 3 index means using NULL value.
+ *
+ */
+std::unique_ptr select_first_true_index(
+ cudf::table_view const& when_bool_columns,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
+} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/cast_string.cu b/src/main/cpp/src/cast_string.cu
index bfbbc3777d..156dbeb7bf 100644
--- a/src/main/cpp/src/cast_string.cu
+++ b/src/main/cpp/src/cast_string.cu
@@ -156,14 +156,14 @@ process_value(bool first_value, T current_val, T const new_digit, bool adding)
* @param ansi_mode true if ansi mode is required, which is more strict and throws
*/
template
-void __global__ string_to_integer_kernel(T* out,
- bitmask_type* validity,
- const char* const chars,
- size_type const* offsets,
- bitmask_type const* incoming_null_mask,
- size_type num_rows,
- bool ansi_mode,
- bool strip)
+CUDF_KERNEL void string_to_integer_kernel(T* out,
+ bitmask_type* validity,
+ const char* const chars,
+ size_type const* offsets,
+ bitmask_type const* incoming_null_mask,
+ size_type num_rows,
+ bool ansi_mode,
+ bool strip)
{
auto const group = cooperative_groups::this_thread_block();
auto const warp = cooperative_groups::tiled_partition(group);
@@ -386,18 +386,17 @@ __device__ thrust::optional> validate_and_exponent
* @param scale scale of desired decimals
* @param precision precision of desired decimals
* @param ansi_mode true if ansi mode is required, which is more strict and throws
- * @return __global__
*/
template
-__global__ void string_to_decimal_kernel(T* out,
- bitmask_type* validity,
- const char* const chars,
- size_type const* offsets,
- bitmask_type const* incoming_null_mask,
- size_type num_rows,
- int32_t scale,
- int32_t precision,
- bool strip)
+CUDF_KERNEL void string_to_decimal_kernel(T* out,
+ bitmask_type* validity,
+ const char* const chars,
+ size_type const* offsets,
+ bitmask_type const* incoming_null_mask,
+ size_type num_rows,
+ int32_t scale,
+ int32_t precision,
+ bool strip)
{
auto const group = cooperative_groups::this_thread_block();
auto const warp = cooperative_groups::tiled_partition(group);
diff --git a/src/main/cpp/src/cast_string_to_float.cu b/src/main/cpp/src/cast_string_to_float.cu
index e843d645ce..c19a2a10fe 100644
--- a/src/main/cpp/src/cast_string_to_float.cu
+++ b/src/main/cpp/src/cast_string_to_float.cu
@@ -102,7 +102,7 @@ class string_to_float {
int sign = check_for_sign();
// check for leading nan
- if (check_for_nan()) {
+ if (check_for_nan(sign)) {
_out[_row] = NAN;
compute_validity(_valid, _except);
return;
@@ -112,7 +112,7 @@ class string_to_float {
if (check_for_inf()) {
if (_warp_lane == 0) {
_out[_row] =
- sign > 0 ? std::numeric_limits::infinity() : -std::numeric_limits::infinity();
+ sign >= 0 ? std::numeric_limits::infinity() : -std::numeric_limits::infinity();
}
compute_validity(_valid, _except);
return;
@@ -140,7 +140,9 @@ class string_to_float {
_except = true;
}
- if (_warp_lane == 0) { _out[_row] = sign * static_cast(0); }
+ if (_warp_lane == 0) {
+ _out[_row] = sign >= 0 ? static_cast(0) : -static_cast(0);
+ }
compute_validity(_valid, _except);
return;
}
@@ -154,15 +156,15 @@ class string_to_float {
// construct the final float value
if (_warp_lane == 0) {
// base value
- double digitsf = sign * static_cast(digits);
+ double digitsf = sign >= 0 ? static_cast(digits) : -static_cast(digits);
// exponent
int exp_ten = exp_base + manual_exp;
// final value
if (exp_ten > std::numeric_limits::max_exponent10) {
- _out[_row] = sign > 0 ? std::numeric_limits::infinity()
- : -std::numeric_limits::infinity();
+ _out[_row] = sign >= 0 ? std::numeric_limits::infinity()
+ : -std::numeric_limits::infinity();
} else {
// make sure we don't produce a subnormal number.
// - a normal number is one where the leading digit of the floating point rep is not zero.
@@ -236,32 +238,45 @@ class string_to_float {
// returns true if we encountered 'nan'
// potentially changes: valid/except
- __device__ bool check_for_nan()
+ __device__ bool check_for_nan(int const& sign)
{
auto const nan_mask = __ballot_sync(0xffffffff,
(_warp_lane == 0 && (_c == 'N' || _c == 'n')) ||
(_warp_lane == 1 && (_c == 'A' || _c == 'a')) ||
(_warp_lane == 2 && (_c == 'N' || _c == 'n')));
if (nan_mask == 0x7) {
- // if we start with 'nan', then even if we have other garbage character, this is a null row.
- //
- // if we're in ansi mode and this is not -precisely- nan, report that so that we can throw
- // an exception later.
- if (_len != 3) {
- _valid = false;
- _except = _len != 3;
- }
- return true;
+ // if we start with 'nan', then even if we have other garbage character(excluding
+ // whitespaces), this is a null row. but for e.g. : "nan " cases. spark will treat the as
+ // "nan", when the trailing characters are whitespaces, it is still a valid string. if we're
+ // in ansi mode and this is not -precisely- nan, report that so that we can throw an exception
+ // later.
+
+ // move forward the current position by 3
+ _bpos += 3;
+ _c = __shfl_down_sync(0xffffffff, _c, 3);
+
+ // remove the trailing whitespaces, if there exits
+ remove_leading_whitespace();
+
+ // if we're at the end and there is no sign, because Spark treats '-nan' and '+nan' as null.
+ if (_bpos == _len && sign == 0) { return true; }
+ // if we reach out here, it means that we have other garbage character.
+ _valid = false;
+ _except = true;
}
return false;
}
- // returns 1 or -1 to indicate sign
+ // The `sign` variables is initialized to 0, indicating no sign.
+ // If a sign is detected, it sets `sign` to 1, indicating `+` sign.
+ // If `-` is then detected, it sets `sign` to -1.
+ // returns 1, 0, -1 to indicate signs.
__device__ int check_for_sign()
{
auto const sign_mask = __ballot_sync(0xffffffff, _warp_lane == 0 && (_c == '+' || _c == '-'));
- int sign = 1;
+ int sign = 0;
if (sign_mask) {
+ sign = 1;
// NOTE: warp lane 0 is the only thread that ever reads `sign`, so technically it would be
// valid to just check if(c == '-'), but that would leave other threads with an incorrect
// value. if this code ever changes, that could lead to hard-to-find bugs.
@@ -299,11 +314,19 @@ class string_to_float {
_bpos += 5;
// if we're at the end
if (_bpos == _len) { return true; }
+ _c = __shfl_down_sync(0xffffffff, _c, 5);
}
+ // remove the remaining whitespace if exists
+ remove_leading_whitespace();
+
+ // if we're at the end
+ if (_bpos == _len) { return true; }
+
// if we reach here for any reason, it means we have "inf" or "infinity" at the start of the
// string but also have additional characters, making this whole thing bogus/null
_valid = false;
+
return true;
}
return false;
@@ -595,14 +618,14 @@ class string_to_float {
};
template
-__global__ void string_to_float_kernel(T* out,
- bitmask_type* validity,
- int32_t* ansi_except,
- size_type* valid_count,
- const char* const chars,
- size_type const* offsets,
- bitmask_type const* incoming_null_mask,
- size_type const num_rows)
+CUDF_KERNEL void string_to_float_kernel(T* out,
+ bitmask_type* validity,
+ int32_t* ansi_except,
+ size_type* valid_count,
+ const char* const chars,
+ size_type const* offsets,
+ bitmask_type const* incoming_null_mask,
+ size_type const num_rows)
{
size_type const tid = threadIdx.x + (blockDim.x * blockIdx.x);
size_type const row = tid / 32;
diff --git a/src/main/cpp/src/decimal_utils.cu b/src/main/cpp/src/decimal_utils.cu
index 6b9ae61076..147818d9aa 100644
--- a/src/main/cpp/src/decimal_utils.cu
+++ b/src/main/cpp/src/decimal_utils.cu
@@ -15,14 +15,23 @@
*/
#include "decimal_utils.hpp"
+#include "jni_utils.hpp"
+#include
#include
#include
+#include
+#include
#include
#include
+#include
+#include
#include
+#include
+#include
+
#include
#include
@@ -1172,4 +1181,256 @@ std::unique_ptr sub_decimal128(cudf::column_view const& a,
dec128_sub(overflows_view.begin(), sub_view, a, b));
return std::make_unique(std::move(columns));
}
+
+namespace {
+
+using namespace numeric;
+using namespace numeric::detail;
+
+/**
+ * @brief Perform floating-point to integer decimal conversion, matching Spark behavior.
+ *
+ * The desired decimal value is computed as (returned_value * 10^{-pow10}).
+ *
+ * The rounding and precision decisions made here are chosen to match Apache Spark.
+ * Spark wants to perform the conversion as double to have the most precision.
+ * However, the behavior is still slightly different if the original type was float.
+ *
+ * @tparam FloatType The type of floating-point value we are converting from
+ * @tparam IntType The type of integer we are converting to, to store the decimal value
+ *
+ * @param input The floating point value to convert
+ * @param pow10 The power of 10 to scale the floating-point value by
+ * @return Integer representation of the floating-point value, rounding after scaled
+ */
+template )>
+__device__ inline IntType scaled_round(FloatType input, int32_t pow10)
+{
+ // Extract components of the (double-ized) floating point number
+ using converter = floating_converter;
+ auto const integer_rep = converter::bit_cast_to_integer(static_cast(input));
+ if (converter::is_zero(integer_rep)) { return 0; }
+
+ // Note that the significand here is an unsigned integer with sizeof(double)
+ auto const is_negative = converter::get_is_negative(integer_rep);
+ auto const [significand, floating_pow2] = converter::get_significand_and_pow2(integer_rep);
+
+ auto const unsigned_floating = (input < 0) ? -input : input;
+ auto const rounding_wont_overflow = [&] {
+ auto const scale_factor = static_cast(
+ multiply_power10(cuda::std::make_unsigned_t{1}, -pow10));
+ return 10.0 * static_cast(unsigned_floating) * scale_factor <
+ static_cast(cuda::std::numeric_limits::max());
+ }();
+
+ // Spark often wants to round the last decimal place, so we'll perform the conversion
+ // with one lower power of 10 so that we can (optionally) round at the end.
+ // Note that we can't round this way if we've requested the minimum power.
+ bool const can_round = cuda::std::is_same_v ? rounding_wont_overflow : true;
+ auto const shifting_pow10 = can_round ? pow10 - 1 : pow10;
+
+ // Sometimes add half a bit to correct for compiler rounding to nearest floating-point value.
+ // See comments in add_half_if_truncates(), with differences detailed below.
+ // Even if we don't add the bit, shift bits to line up with what the shifting algorithm is
+ // expecting.
+ bool const is_whole_number = cuda::std::floor(input) == input;
+ auto const [base2_value, pow2] = [is_whole_number](auto significand, auto floating_pow2) {
+ if constexpr (cuda::std::is_same_v) {
+ // Add the 1/2 bit regardless of truncation, but still not for whole numbers.
+ auto const base2_value =
+ (significand << 1) + static_cast(!is_whole_number);
+ return cuda::std::make_pair(base2_value, floating_pow2 - 1);
+ } else {
+ // Input was float: never add 1/2 bit.
+ // Why? Because we converted to double, and the 1/2 bit beyond float is WAY too large compared
+ // to double's precision. And the 1/2 bit beyond double is not due to user input.
+ return cuda::std::make_pair(significand << 1, floating_pow2 - 1);
+ }
+ }(significand, floating_pow2);
+
+ // Main algorithm: Apply the powers of 2 and 10 (except for the last power-of-10).
+ // Use larger intermediate type for conversion to avoid overflow for last power-of-10.
+ using intermediate_type =
+ cuda::std::conditional_t, std::int64_t, __int128_t>;
+ cuda::std::make_unsigned_t magnitude =
+ [&, base2_value = base2_value, pow2 = pow2] {
+ if constexpr (cuda::std::is_same_v) {
+ return rounding_wont_overflow ? convert_floating_to_integral_shifting(
+ base2_value, shifting_pow10, pow2)
+ : convert_floating_to_integral_shifting(
+ base2_value, shifting_pow10, pow2);
+ } else {
+ return convert_floating_to_integral_shifting<__int128_t, double>(
+ base2_value, shifting_pow10, pow2);
+ }
+ }();
+
+ // Spark wants to floor the last digits of the output, clearing data that was beyond the
+ // precision that was available in double.
+
+ // How many digits do we need to floor?
+ // From the decimal digit corresponding to pow2 (just past double precision) to the end (pow10).
+ int const floor_pow10 = [&](int pow2_bit) {
+ // The conversion from pow2 to pow10 is log10(2), which is ~ 90/299 (close enough for ints)
+ // But Spark chooses the rougher 3/10 ratio instead of 90/299.
+ if constexpr (cuda::std::is_same_v) {
+ return (3 * pow2_bit - 10 * pow10) / 10;
+ } else {
+ // Spark rounds up the power-of-10 to floor for DOUBLES >= 2^63 (and yes, this is the exact
+ // cutoff).
+ bool const round_up = unsigned_floating > std::numeric_limits::max();
+ return (3 * pow2_bit - 10 * pow10 + 9 * round_up) / 10;
+ }
+ }(pow2);
+
+ // Floor end digits
+ if (can_round) {
+ if (floor_pow10 < 0) {
+ // Truncated: The scale factor cut off the extra, imprecise bits.
+ // To round to the final decimal place, add 5 to one past the last decimal place.
+ magnitude += 5U;
+ magnitude /= 10U; // Apply the last power of 10
+ } else {
+ // We are keeping decimal digits with data beyond the precision of double.
+ // We want to truncate these digits, but sometimes we want to round first.
+ // We will round if and only if we didn't already add a half-bit earlier.
+ if constexpr (cuda::std::is_same_v) {
+ // For doubles, only round the extra digits of whole numbers.
+ // If it was not a whole number, we already added 1/2 a bit at higher precision than this
+ // earlier.
+ if (is_whole_number) {
+ magnitude += multiply_power10(decltype(magnitude)(5), floor_pow10);
+ }
+ } else {
+ // Input was float: we didn't add a half-bit earlier, so round at the edge of precision
+ // here.
+ magnitude += multiply_power10(decltype(magnitude)(5), floor_pow10);
+ }
+
+ // +1: Divide the last power-of-10 that we postponed earlier to do rounding.
+ auto const truncated = divide_power10(magnitude, floor_pow10 + 1);
+ magnitude = multiply_power10(truncated, floor_pow10);
+ }
+ } else if (floor_pow10 > 0) {
+ auto const truncated = divide_power10(magnitude, floor_pow10);
+ magnitude = multiply_power10(truncated, floor_pow10);
+ }
+
+ // Reapply the sign and return.
+ // NOTE: Cast can overflow!
+ auto const signed_magnitude = static_cast(magnitude);
+ return is_negative ? -signed_magnitude : signed_magnitude;
+}
+
+template
+struct floating_point_to_decimal_fn {
+ cudf::column_device_view input;
+ int8_t* validity;
+ bool* has_failure;
+ int32_t decimal_places;
+ DecimalRepType exclusive_bound;
+
+ __device__ DecimalRepType operator()(cudf::size_type idx) const
+ {
+ auto const x = input.element(idx);
+
+ if (input.is_null(idx) || !std::isfinite(x)) {
+ if (!std::isfinite(x)) { *has_failure = true; }
+ validity[idx] = false;
+ return DecimalRepType{0};
+ }
+
+ auto const scaled_rounded = scaled_round(x, -decimal_places);
+ auto const is_out_of_bound =
+ -exclusive_bound >= scaled_rounded || scaled_rounded >= exclusive_bound;
+ if (is_out_of_bound) { *has_failure = true; }
+ validity[idx] = !is_out_of_bound;
+
+ return is_out_of_bound ? DecimalRepType{0} : scaled_rounded;
+ }
+};
+
+struct floating_point_to_decimal_dispatcher {
+ template
+ static constexpr bool supported_types()
+ {
+ return (std::is_same_v || //
+ std::is_same_v)&& //
+ (std::is_same_v ||
+ std::is_same_v ||
+ std::is_same_v);
+ }
+
+ template ())>
+ void operator()(Args...) const
+ {
+ CUDF_FAIL("Unsupported types for floating_point_to_decimal_fn", cudf::data_type_error);
+ }
+
+ template ())>
+ void operator()(cudf::column_view const& input,
+ cudf::mutable_column_view const& output,
+ int8_t* validity,
+ bool* has_failure,
+ int32_t decimal_places,
+ int32_t precision,
+ rmm::cuda_stream_view stream) const
+ {
+ using DecimalRepType = cudf::device_storage_type_t;
+
+ auto const d_input_ptr = cudf::column_device_view::create(input, stream);
+ auto const exclusive_bound = static_cast(
+ multiply_power10(cuda::std::make_unsigned_t{1}, precision));
+
+ thrust::tabulate(rmm::exec_policy_nosync(stream),
+ output.begin(),
+ output.end(),
+ floating_point_to_decimal_fn{
+ *d_input_ptr, validity, has_failure, decimal_places, exclusive_bound});
+ }
+};
+
+} // namespace
+
+std::pair, bool> floating_point_to_decimal(
+ cudf::column_view const& input,
+ cudf::data_type output_type,
+ int32_t precision,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr)
+{
+ auto output = cudf::make_fixed_point_column(
+ output_type, input.size(), cudf::mask_state::UNALLOCATED, stream, mr);
+
+ auto const decimal_places = -output_type.scale();
+ auto const default_mr = rmm::mr::get_current_device_resource();
+
+ rmm::device_uvector validity(input.size(), stream, default_mr);
+ rmm::device_scalar has_failure(false, stream, default_mr);
+
+ cudf::double_type_dispatcher(input.type(),
+ output_type,
+ floating_point_to_decimal_dispatcher{},
+ input,
+ output->mutable_view(),
+ validity.begin(),
+ has_failure.data(),
+ decimal_places,
+ precision,
+ stream);
+
+ auto [null_mask, null_count] =
+ cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr);
+ if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); }
+
+ return {std::move(output), has_failure.value(stream)};
+}
+
} // namespace cudf::jni
diff --git a/src/main/cpp/src/decimal_utils.hpp b/src/main/cpp/src/decimal_utils.hpp
index 9793e63445..8673314454 100644
--- a/src/main/cpp/src/decimal_utils.hpp
+++ b/src/main/cpp/src/decimal_utils.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2023, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -62,4 +62,23 @@ std::unique_ptr sub_decimal128(
cudf::column_view const& b,
int32_t quotient_scale,
rmm::cuda_stream_view stream = cudf::get_default_stream());
+
+/**
+ * @brief Cast floating point values to decimals, matching the behavior of Spark.
+ *
+ * @param input The input column, which is either FLOAT32 or FLOAT64 type
+ * @param output_type The output decimal type
+ * @param precision The maximum number of digits that will be preserved in the output
+ * @param stream CUDA stream used for device memory operations and kernel launches
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ * @return A cudf column containing the cast result and a boolean value indicating whether the cast
+ operation has failed for any input rows
+ */
+std::pair, bool> floating_point_to_decimal(
+ cudf::column_view const& input,
+ cudf::data_type output_type,
+ int32_t precision,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
} // namespace cudf::jni
diff --git a/src/main/cpp/src/get_json_object.cu b/src/main/cpp/src/get_json_object.cu
index 887b9887de..690da3f702 100644
--- a/src/main/cpp/src/get_json_object.cu
+++ b/src/main/cpp/src/get_json_object.cu
@@ -19,25 +19,26 @@
#include
#include
-#include
-#include
+#include
#include
#include
#include
+#include
#include
#include
-#include
#include
#include
#include
-#include
#include
#include
#include
#include
+#include
+#include
#include
+#include
#include
namespace spark_rapids_jni {
@@ -51,57 +52,52 @@ namespace detail {
constexpr int max_path_depth = 16;
/**
- * write JSON style
+ * @brief JSON style to write.
*/
-enum class write_style { RAW, QUOTED, FLATTEN };
+enum class write_style : int8_t { RAW, QUOTED, FLATTEN };
/**
- * path instruction
+ * @brief Instruction along a JSON path.
*/
struct path_instruction {
__device__ inline path_instruction(path_instruction_type _type) : type(_type) {}
- path_instruction_type type;
-
// used when type is named type
cudf::string_view name;
// used when type is index
int index{-1};
+
+ path_instruction_type type;
};
/**
- * JSON generator is used to write out JSON content.
+ * @brief JSON generator used to write out JSON content.
+ *
* Because of get_json_object only outputs JSON object as a whole item,
* it's no need to store internal state for JSON object when outputing,
* only need to store internal state for JSON array.
*/
class json_generator {
public:
- __device__ json_generator(char* _output) : output(_output), output_len(0) {}
- __device__ json_generator() : output(nullptr), output_len(0) {}
+ __device__ json_generator(int _offset = 0) : offset(_offset), output_len(0) {}
// create a nested child generator based on this parent generator,
// child generator is a view, parent and child share the same byte array
- __device__ json_generator new_child_generator()
+ __device__ json_generator new_child_generator() const
{
- if (nullptr == output) {
- return json_generator();
- } else {
- return json_generator(output + output_len);
- }
+ return json_generator(offset + output_len);
}
// write [
// add an extra comma if needed,
// e.g.: when JSON content is: [[1,2,3]
// writing a new [ should result: [[1,2,3],[
- __device__ void write_start_array()
+ __device__ void write_start_array(char* out_begin)
{
- try_write_comma();
-
- if (output) { *(output + output_len) = '['; }
+ try_write_comma(out_begin);
+ out_begin[offset + output_len] = '[';
output_len++;
array_depth++;
// new array is empty
@@ -109,14 +105,12 @@ class json_generator {
}
// write ]
- __device__ void write_end_array()
+ __device__ void write_end_array(char* out_begin)
{
- if (output) { *(output + output_len) = ']'; }
+ out_begin[offset + output_len] = ']';
output_len++;
-
// point to parent array
array_depth--;
-
// set parent array as non-empty because already had a closed child item.
is_curr_array_empty = false;
}
@@ -132,16 +126,16 @@ class json_generator {
}
// return true if it's in a array context and it's not writing the first item.
- __device__ inline bool need_comma() { return (array_depth > 0 && !is_curr_array_empty); }
+ __device__ inline bool need_comma() const { return (array_depth > 0 && !is_curr_array_empty); }
/**
* write comma accroding to current generator state
*/
- __device__ void try_write_comma()
+ __device__ void try_write_comma(char* out_begin)
{
if (need_comma()) {
// in array context and writes first item
- if (output) { *(output + output_len) = ','; }
+ out_begin[offset + output_len] = ',';
output_len++;
}
}
@@ -151,24 +145,16 @@ class json_generator {
* object/array, then copy to corresponding matched end object/array. return
* false if JSON format is invalid return true if JSON format is valid
*/
- __device__ bool copy_current_structure(json_parser& parser)
+ __device__ bool copy_current_structure(json_parser& parser, char* out_begin)
{
// first try add comma
- try_write_comma();
+ try_write_comma(out_begin);
if (array_depth > 0) { is_curr_array_empty = false; }
- if (nullptr != output) {
- auto copy_to = output + output_len;
- auto [b, copy_len] = parser.copy_current_structure(copy_to);
- output_len += copy_len;
- return b;
- } else {
- char* copy_to = nullptr;
- auto [b, copy_len] = parser.copy_current_structure(copy_to);
- output_len += copy_len;
- return b;
- }
+ auto [b, copy_len] = parser.copy_current_structure(out_begin + offset + output_len);
+ output_len += copy_len;
+ return b;
}
/**
@@ -178,17 +164,12 @@ class json_generator {
* then can not return a pointer and length pair (char *, len),
* For number token, JSON parser can return a pair (char *, len)
*/
- __device__ void write_raw(json_parser& parser)
+ __device__ void write_raw(json_parser& parser, char* out_begin)
{
if (array_depth > 0) { is_curr_array_empty = false; }
- if (nullptr != output) {
- auto copied = parser.write_unescaped_text(output + output_len);
- output_len += copied;
- } else {
- auto len = parser.compute_unescaped_len();
- output_len += len;
- }
+ auto copied = parser.write_unescaped_text(out_begin + offset + output_len);
+ output_len += copied;
}
/**
@@ -222,34 +203,32 @@ class json_generator {
* block
*/
__device__ void write_child_raw_value(char* child_block_begin,
- size_t child_block_len,
+ int child_block_len,
bool write_outer_array_tokens)
{
bool insert_comma = need_comma();
if (array_depth > 0) { is_curr_array_empty = false; }
- if (nullptr != output) {
- if (write_outer_array_tokens) {
- if (insert_comma) {
- *(child_block_begin + child_block_len + 2) = ']';
- move_forward(child_block_begin, child_block_len, 2);
- *(child_block_begin + 1) = '[';
- *(child_block_begin) = ',';
- } else {
- *(child_block_begin + child_block_len + 1) = ']';
- move_forward(child_block_begin, child_block_len, 1);
- *(child_block_begin) = '[';
- }
+ if (write_outer_array_tokens) {
+ if (insert_comma) {
+ *(child_block_begin + child_block_len + 2) = ']';
+ move_forward(child_block_begin, child_block_len, 2);
+ *(child_block_begin + 1) = '[';
+ *(child_block_begin) = ',';
} else {
- if (insert_comma) {
- move_forward(child_block_begin, child_block_len, 1);
- *(child_block_begin) = ',';
- } else {
- // do not need comma && do not need write outer array tokens
- // do nothing, because child generator buff is directly after the
- // parent generator
- }
+ *(child_block_begin + child_block_len + 1) = ']';
+ move_forward(child_block_begin, child_block_len, 1);
+ *(child_block_begin) = '[';
+ }
+ } else {
+ if (insert_comma) {
+ move_forward(child_block_begin, child_block_len, 1);
+ *(child_block_begin) = ',';
+ } else {
+ // do not need comma && do not need write outer array tokens
+ // do nothing, because child generator buff is directly after the
+ // parent generator
}
}
@@ -265,7 +244,7 @@ class json_generator {
// e.g.: memory is: 1 2 0 0, begin is 1, len is 1, after moving,
// memory is: 1 1 2 0.
// Note: should move from end to begin to avoid overwrite buffer
- __device__ void move_forward(char* begin, size_t len, int forward)
+ static __device__ void move_forward(char* begin, size_t len, int forward)
{
// TODO copy by 8 bytes
char* pos = begin + len + forward - 1;
@@ -276,9 +255,8 @@ class json_generator {
}
}
- __device__ inline size_t get_output_len() const { return output_len; }
- __device__ inline char* get_output_start_position() const { return output; }
- __device__ inline char* get_current_output_position() const { return output + output_len; }
+ __device__ inline int get_offset() const { return offset; }
+ __device__ inline int get_output_len() const { return output_len; }
/**
* generator may contain trash output, e.g.: generator writes some output,
@@ -289,13 +267,14 @@ class json_generator {
__device__ inline void set_output_len(size_t len) { output_len = len; }
private:
- char* output;
- size_t output_len;
+ int offset; // offset from the global output buffer
+ int output_len;
+
+ int array_depth = 0;
// whether already worte a item in current array
// used to decide whether add a comma before writing out a new item.
bool is_curr_array_empty;
- int array_depth = 0;
};
/**
@@ -353,83 +332,100 @@ __device__ inline thrust::tuple path_match_index_wildcard(
}
/**
- *
- * This function is rewritten from above commented recursive function.
- * this function is equivalent to the above commented recursive function.
+ * @brief The cases that mirro Apache Spark case path in `jsonExpressions.scala#evaluatePath()`.
*/
-__device__ bool evaluate_path(json_parser& p,
- json_generator& root_g,
- write_style root_style,
- cudf::device_span root_path)
-{
- // manually maintained context stack in lieu of calling evaluate_path recursively.
- struct context {
- // current token
- json_token token;
+enum class evaluation_case_path : int8_t {
+ INVALID = -1,
+ START_ARRAY___EMPTY_PATH___FLATTEN_STYLE = 2,
+ START_OBJECT___MATCHED_NAME_PATH = 4,
+ START_ARRAY___MATCHED_DOUBLE_WILDCARD = 5,
+ START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED = 6,
+ START_ARRAY___MATCHED_WILDCARD = 7,
+ START_ARRAY___MATCHED_INDEX_AND_WILDCARD = 8,
+ START_ARRAY___MATCHED_INDEX = 9
+};
+
+/**
+ * @brief The struct to store states during processing JSON through different nested levels.
+ */
+struct context {
+ // used to save current generator
+ json_generator g;
- // which case path that this task is from
- int case_path;
+ // used to save child JSON generator for case path 6
+ json_generator child_g;
- // used to save current generator
- json_generator g;
+ cudf::device_span path;
- write_style style;
+ // whether written output
+ // if dirty > 0, indicates success
+ int dirty;
- cudf::device_span path;
- // is this context task is done
- bool task_is_done;
+ // which case path that this task is from
+ evaluation_case_path case_path;
- // whether written output
- // if dirty > 0, indicates success
- int dirty;
+ // current token
+ json_token token;
- // for some case paths
- bool is_first_enter;
+ write_style style;
- // used to save child JSON generator for case path 8
- json_generator child_g;
- };
+ // for some case paths
+ bool is_first_enter;
+
+ // is this context task is done
+ bool task_is_done;
+};
+
+/**
+ * @brief Parse a single json string using the provided command buffer.
+ *
+ * @param input The incoming json string
+ * @param path_commands The command buffer to be applied to the string
+ * @param out_buf Buffer user to store the string resulted from the query
+ * @return A pair containing the result code and the output size
+ */
+__device__ thrust::pair evaluate_path(
+ char_range input, cudf::device_span path_commands, char* out_buf)
+{
+ json_parser p{input};
+ p.next_token();
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// define stack; plus 1 indicates root context task needs an extra memory
context stack[max_path_depth + 1];
- int stack_pos = 0;
+ int stack_size = 0;
// push context function
- auto push_context = [&stack, &stack_pos](json_token _token,
- int _case_path,
- json_generator _g,
- write_style _style,
- cudf::device_span _path) {
+ auto push_context = [&p, &stack, &stack_size](evaluation_case_path _case_path,
+ json_generator _g,
+ write_style _style,
+ cudf::device_span _path) {
// no need to check stack is full
// because Spark-Rapids already checked maximum length of `path_instruction`
- auto& ctx = stack[stack_pos];
- ctx.token = _token;
+ auto& ctx = stack[stack_size++];
+ ctx.g = std::move(_g);
+ ctx.path = std::move(_path);
+ ctx.dirty = 0;
ctx.case_path = _case_path;
- ctx.g = _g;
+ ctx.token = p.get_current_token();
ctx.style = _style;
- ctx.path = _path;
- ctx.task_is_done = false;
- ctx.dirty = 0;
ctx.is_first_enter = true;
-
- stack_pos++;
+ ctx.task_is_done = false;
};
// put the first context task
- push_context(p.get_current_token(), -1, root_g, root_style, root_path);
+ push_context(evaluation_case_path::INVALID, json_generator{}, write_style::RAW, path_commands);
- while (stack_pos > 0) {
- auto& ctx = stack[stack_pos - 1];
+ while (stack_size > 0) {
+ auto& ctx = stack[stack_size - 1];
if (!ctx.task_is_done) {
- // task is not done.
-
// case (VALUE_STRING, Nil) if style == RawStyle
// case path 1
if (json_token::VALUE_STRING == ctx.token && path_is_empty(ctx.path.size()) &&
ctx.style == write_style::RAW) {
// there is no array wildcard or slice parent, emit this string without
// quotes write current string in parser to generator
- ctx.g.write_raw(p);
+ ctx.g.write_raw(p, out_buf);
ctx.dirty = 1;
ctx.task_is_done = true;
}
@@ -440,10 +436,13 @@ __device__ bool evaluate_path(json_parser& p,
// flatten this array into the parent
if (json_token::END_ARRAY != p.next_token()) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// push back task
// add child task
- push_context(p.get_current_token(), 2, ctx.g, ctx.style, {nullptr, 0});
+ push_context(evaluation_case_path::START_ARRAY___EMPTY_PATH___FLATTEN_STYLE,
+ ctx.g,
+ ctx.style,
+ {nullptr, 0});
} else {
// END_ARRAY
ctx.task_is_done = true;
@@ -453,9 +452,9 @@ __device__ bool evaluate_path(json_parser& p,
// case path 3
else if (path_is_empty(ctx.path.size())) {
// general case: just copy the child tree verbatim
- if (!(ctx.g.copy_current_structure(p))) {
+ if (!(ctx.g.copy_current_structure(p, out_buf))) {
// JSON validation check
- return false;
+ return {false, 0};
}
ctx.dirty = 1;
ctx.task_is_done = true;
@@ -470,23 +469,22 @@ __device__ bool evaluate_path(json_parser& p,
if (ctx.dirty > 0) {
while (json_token::END_OBJECT != p.next_token()) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// skip FIELD_NAME token
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// skip value of FIELD_NAME
if (!p.try_skip_children()) {
// JSON validation check
- return false;
+ return {false, 0};
}
}
- ctx.task_is_done = true;
- } else {
- return false;
}
+ // Mark task is done regardless whether the expected child was found.
+ ctx.task_is_done = true;
} else {
// below is 1st enter
ctx.is_first_enter = false;
@@ -494,7 +492,7 @@ __device__ bool evaluate_path(json_parser& p,
bool found_expected_child = false;
while (json_token::END_OBJECT != p.next_token()) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// need to try more children
auto match_named = path_match_named(ctx.path);
@@ -504,13 +502,12 @@ __device__ bool evaluate_path(json_parser& p,
// skip FIELD_NAME token
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// meets null token, it's not expected, return false
- if (json_token::VALUE_NULL == p.get_current_token()) { return false; }
+ if (json_token::VALUE_NULL == p.get_current_token()) { return {false, 0}; }
// push sub task; sub task will update the result of path 4
- push_context(p.get_current_token(),
- 4,
+ push_context(evaluation_case_path::START_OBJECT___MATCHED_NAME_PATH,
ctx.g,
ctx.style,
{ctx.path.data() + 1, ctx.path.size() - 1});
@@ -520,12 +517,12 @@ __device__ bool evaluate_path(json_parser& p,
// skip FIELD_NAME token
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// current child is not expected, skip current child
if (!p.try_skip_children()) {
// JSON validation check
- return false;
+ return {false, 0};
}
}
}
@@ -545,19 +542,18 @@ __device__ bool evaluate_path(json_parser& p,
// behavior in Hive
if (ctx.is_first_enter) {
ctx.is_first_enter = false;
- ctx.g.write_start_array();
+ ctx.g.write_start_array(out_buf);
}
if (p.next_token() != json_token::END_ARRAY) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
- push_context(p.get_current_token(),
- 5,
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
+ push_context(evaluation_case_path::START_ARRAY___MATCHED_DOUBLE_WILDCARD,
ctx.g,
write_style::FLATTEN,
{ctx.path.data() + 2, ctx.path.size() - 2});
} else {
- ctx.g.write_end_array();
+ ctx.g.write_end_array(out_buf);
ctx.task_is_done = true;
}
}
@@ -590,28 +586,28 @@ __device__ bool evaluate_path(json_parser& p,
if (p.next_token() != json_token::END_ARRAY) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// track the number of array elements and only emit an outer array if
// we've written more than one element, this matches Hive's behavior
- push_context(p.get_current_token(),
- 6,
+ push_context(evaluation_case_path::START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED,
child_g,
next_style,
{ctx.path.data() + 1, ctx.path.size() - 1});
} else {
- char* child_g_start = child_g.get_output_start_position();
- size_t child_g_len = child_g.get_output_len();
+ char* child_g_start = out_buf + child_g.get_offset();
+ int child_g_len = child_g.get_output_len();
if (ctx.dirty > 1) {
// add outer array tokens
ctx.g.write_child_raw_value(
child_g_start, child_g_len, /* write_outer_array_tokens */ true);
- ctx.task_is_done = true;
} else if (ctx.dirty == 1) {
// remove outer array tokens
ctx.g.write_child_raw_value(
child_g_start, child_g_len, /* write_outer_array_tokens */ false);
- ctx.task_is_done = true;
} // else do not write anything
+
+ // Done anyway, since we already reached the end array.
+ ctx.task_is_done = true;
}
}
// case (START_ARRAY, Wildcard :: xs)
@@ -620,21 +616,20 @@ __device__ bool evaluate_path(json_parser& p,
path_match_element(ctx.path, path_instruction_type::WILDCARD)) {
if (ctx.is_first_enter) {
ctx.is_first_enter = false;
- ctx.g.write_start_array();
+ ctx.g.write_start_array(out_buf);
}
if (p.next_token() != json_token::END_ARRAY) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// wildcards can have multiple matches, continually update the dirty
// count
- push_context(p.get_current_token(),
- 7,
+ push_context(evaluation_case_path::START_ARRAY___MATCHED_WILDCARD,
ctx.g,
write_style::QUOTED,
{ctx.path.data() + 1, ctx.path.size() - 1});
} else {
- ctx.g.write_end_array();
+ ctx.g.write_end_array(out_buf);
ctx.task_is_done = true;
}
}
@@ -646,28 +641,27 @@ __device__ bool evaluate_path(json_parser& p,
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
ctx.is_first_enter = false;
int i = idx;
while (i > 0) {
if (p.get_current_token() == json_token::END_ARRAY) {
// terminate, nothing has been written
- return false;
+ return {false, 0};
}
- if (!p.try_skip_children()) { return false; }
+ if (!p.try_skip_children()) { return {false, 0}; }
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
--i;
}
// i == 0
- push_context(p.get_current_token(),
- 8,
+ push_context(evaluation_case_path::START_ARRAY___MATCHED_INDEX_AND_WILDCARD,
ctx.g,
write_style::QUOTED,
{ctx.path.data() + 1, ctx.path.size() - 1});
@@ -679,338 +673,447 @@ __device__ bool evaluate_path(json_parser& p,
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
int i = idx;
while (i > 0) {
if (p.get_current_token() == json_token::END_ARRAY) {
// terminate, nothing has been written
- return false;
+ return {false, 0};
}
- if (!p.try_skip_children()) { return false; }
+ if (!p.try_skip_children()) { return {false, 0}; }
p.next_token();
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
--i;
}
// i == 0
- push_context(
- p.get_current_token(), 9, ctx.g, ctx.style, {ctx.path.data() + 1, ctx.path.size() - 1});
+ push_context(evaluation_case_path::START_ARRAY___MATCHED_INDEX,
+ ctx.g,
+ ctx.style,
+ {ctx.path.data() + 1, ctx.path.size() - 1});
}
// case _ =>
// case path 12
else {
- if (!p.try_skip_children()) { return false; }
+ if (!p.try_skip_children()) { return {false, 0}; }
// default case path, return false for this task
ctx.dirty = 0;
ctx.task_is_done = true;
}
- } else {
- // current context is done.
-
+ } // if (!ctx.task_is_done)
+ else { // current context is done.
// pop current top context
- stack_pos--;
+ stack_size--;
- // pop parent task
+ // has no parent task, stack is empty, will exit
+ if (stack_size == 0) { break; }
+
+ // peek parent context task
// update parent task info according to current task result
- if (stack_pos > 0) {
- // peek parent context task
- auto& p_ctx = stack[stack_pos - 1];
-
- // case (VALUE_STRING, Nil) if style == RawStyle
- // case path 1
- if (1 == ctx.case_path) {
- // never happen
- }
- // path 2: case (START_ARRAY, Nil) if style == FlattenStyle
- // path 5: case (START_ARRAY, Wildcard :: Wildcard :: xs)
- // path 7: case (START_ARRAY, Wildcard :: xs)
- else if (2 == ctx.case_path || 5 == ctx.case_path || 7 == ctx.case_path) {
+ auto& p_ctx = stack[stack_size - 1];
+
+ switch (ctx.case_path) {
+ // path 2: case (START_ARRAY, Nil) if style == FlattenStyle
+ // path 5: case (START_ARRAY, Wildcard :: Wildcard :: xs)
+ // path 7: case (START_ARRAY, Wildcard :: xs)
+ case evaluation_case_path::START_ARRAY___EMPTY_PATH___FLATTEN_STYLE:
+ case evaluation_case_path::START_ARRAY___MATCHED_DOUBLE_WILDCARD:
+ case evaluation_case_path::START_ARRAY___MATCHED_WILDCARD: {
// collect result from child task
p_ctx.dirty += ctx.dirty;
// copy generator states to parent task;
p_ctx.g = ctx.g;
+
+ break;
}
- // case (START_OBJECT, Named :: xs)
- // case path 4
- else if (4 == ctx.case_path) {
+
+ // case (START_OBJECT, Named :: xs)
+ // case path 4
+ case evaluation_case_path::START_OBJECT___MATCHED_NAME_PATH: {
p_ctx.dirty = ctx.dirty;
// copy generator states to parent task;
p_ctx.g = ctx.g;
+
+ break;
}
- // case (START_ARRAY, Wildcard :: xs) if style != QuotedStyle
- // case path 6
- else if (6 == ctx.case_path) {
+
+ // case (START_ARRAY, Wildcard :: xs) if style != QuotedStyle
+ // case path 6
+ case evaluation_case_path::START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED: {
// collect result from child task
p_ctx.dirty += ctx.dirty;
// update child generator for parent task
p_ctx.child_g = ctx.g;
+
+ break;
}
- /* case (START_ARRAY, Index(idx) :: (xs@Wildcard :: _)) */
- // case path 8
- // case (START_ARRAY, Index(idx) :: xs)
- // case path 9
- else if (8 == ctx.case_path || 9 == ctx.case_path) {
+
+ /* case (START_ARRAY, Index(idx) :: (xs@Wildcard :: _)) */
+ // case path 8
+ // case (START_ARRAY, Index(idx) :: xs)
+ // case path 9
+ case evaluation_case_path::START_ARRAY___MATCHED_INDEX_AND_WILDCARD:
+ case evaluation_case_path::START_ARRAY___MATCHED_INDEX: {
// collect result from child task
p_ctx.dirty += ctx.dirty;
// post logic:
while (p.next_token() != json_token::END_ARRAY) {
// JSON validation check
- if (json_token::ERROR == p.get_current_token()) { return false; }
+ if (json_token::ERROR == p.get_current_token()) { return {false, 0}; }
// advance the token stream to the end of the array
- if (!p.try_skip_children()) { return false; }
+ if (!p.try_skip_children()) { return {false, 0}; }
}
// task is done
p_ctx.task_is_done = true;
// copy generator states to parent task;
p_ctx.g = ctx.g;
+
+ break;
}
- // case path 3: case (_, Nil)
- // case path 12: case _ =>
- // others
- else {
- // never happen
- }
- } else {
- // has no parent task, stack is empty, will exit
- }
- }
- }
- // copy output len
- root_g.set_output_len(stack[0].g.get_output_len());
- return stack[0].dirty > 0;
-}
+ default:; // Never happens!
+ } // end switch (ctx.case_path)
+ } // ctx.task_is_done
+ } // while (stack_size > 0)
-rmm::device_uvector construct_path_commands(
- std::vector> const& instructions,
- cudf::string_scalar const& all_names_scalar,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr)
-{
- int name_pos = 0;
-
- // construct the path commands
- std::vector path_commands;
- for (auto const& inst : instructions) {
- auto const& [type, name, index] = inst;
- switch (type) {
- case path_instruction_type::WILDCARD:
- path_commands.emplace_back(path_instruction{path_instruction_type::WILDCARD});
- break;
- case path_instruction_type::INDEX:
- path_commands.emplace_back(path_instruction{path_instruction_type::INDEX});
- path_commands.back().index = index;
- break;
- case path_instruction_type::NAMED:
- path_commands.emplace_back(path_instruction{path_instruction_type::NAMED});
- path_commands.back().name =
- cudf::string_view(all_names_scalar.data() + name_pos, name.size());
- name_pos += name.size();
- break;
- default: CUDF_FAIL("Invalid path instruction type");
- }
- }
- // convert to uvector
- return cudf::detail::make_device_uvector_sync(path_commands, stream, mr);
+ auto const success = stack[0].dirty > 0;
+
+ // generator may contain trash output, e.g.: generator writes some output,
+ // then JSON format is invalid, the previous output becomes trash.
+ // We need to return output size as zero.
+ return {success, success ? stack[0].g.get_output_len() : 0};
}
/**
- * @brief Parse a single json string using the provided command buffer
+ * @brief Struct storing data such as path instructions, output buffer etc, corresponding to a
+ * single JSON path.
+ */
+struct json_path_processing_data {
+ cudf::device_span path_commands;
+ cudf::detail::input_offsetalator offsets;
+ thrust::pair* out_stringviews;
+ char* out_buf;
+ int8_t* has_out_of_bound;
+};
+
+/**
+ * @brief Kernel for running the JSONPath query, in which one input row is processed by entire
+ * warp (or multiple warps) of threads.
*
+ * The number of warps processing each row is computed as `ceil(num_paths / warp_size)`.
*
- * @param input The incoming json string
- * @param input_len Size of the incoming json string
- * @param path_commands_ptr The command buffer to be applied to the string.
- * @param path_commands_size The command buffer size.
- * @param out_buf Buffer user to store the results of the query
- * (nullptr in the size computation step)
- * @param out_buf_size Size of the output buffer
- * @returns A pair containing the result code and the output buffer.
+ * We explicitly set a value for `min_block_per_sm` parameter in the launch bounds to avoid
+ * spilling from the kernel itself. By default NVCC uses a heuristic to find a balance between
+ * the maximum number of registers used by a kernel and the parallelism of the kernel.
+ * If lots of registers are used the parallelism may suffer. But in our case NVCC gets this wrong
+ * and we want to avoid spilling all the time or else the performance is really bad. This
+ * essentially tells NVCC to prefer using lots of registers over spilling.
+ *
+ * @param input The input JSON strings stored in a strings column
+ * @param path_data Array containing all path data
+ * @param num_threads_per_row Number of threads processing each input row
*/
-__device__ thrust::pair get_json_object_single(
- char_range input,
- cudf::device_span path_commands,
- char* out_buf,
- size_t out_buf_size)
+template
+__launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL
+ void get_json_object_kernel(cudf::column_device_view input,
+ cudf::device_span path_data,
+ std::size_t num_threads_per_row)
{
- json_parser j_parser(input);
- j_parser.next_token();
- // JSON validation check
- if (json_token::ERROR == j_parser.get_current_token()) { return {false, 0}; }
-
- // First pass: preprocess sizes.
- // Second pass: writes output.
- // The generator automatically determines which pass based on `out_buf`.
- // If `out_buf_size` is zero, pass in `nullptr` to avoid generator writing trash output.
- json_generator generator((out_buf_size == 0) ? nullptr : out_buf);
-
- bool const success = evaluate_path(
- j_parser, generator, write_style::RAW, {path_commands.data(), path_commands.size()});
-
- if (!success) {
- // generator may contain trash output, e.g.: generator writes some output,
- // then JSON format is invalid, the previous output becomes trash.
- // set output as zero to tell second step
- generator.set_output_len_zero();
+ auto const tidx = cudf::detail::grid_1d::global_thread_id();
+ auto const row_idx = tidx / num_threads_per_row;
+ if (row_idx >= input.size()) { return; }
+
+ auto const path_idx = tidx % num_threads_per_row;
+ if (path_idx >= path_data.size()) { return; }
+
+ auto const& path = path_data[path_idx];
+ char* const dst = path.out_buf + path.offsets[row_idx];
+ bool is_valid = false;
+ cudf::size_type out_size = 0;
+
+ auto const str = input.element(row_idx);
+ if (str.size_bytes() > 0) {
+ thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, path.path_commands, dst);
+
+ auto const max_size = path.offsets[row_idx + 1] - path.offsets[row_idx];
+ if (out_size > max_size) { *(path.has_out_of_bound) = 1; }
}
- return {success, generator.get_output_len()};
+ // Write out `nullptr` in the output string_view to indicate that the output is a null.
+ // The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
+ // second time due to out-of-bound write in the first launch.
+ if (path.out_stringviews) {
+ path.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size};
+ }
}
/**
- * @brief Kernel for running the JSONPath query.
+ * @brief A utility class to launch the main kernel.
+ */
+struct kernel_launcher {
+ static void exec(cudf::column_device_view const& input,
+ cudf::device_span path_data,
+ rmm::cuda_stream_view stream)
+ {
+ // The optimal values for block_size and min_block_per_sm were found through testing,
+ // which are either 128-8 or 256-4. The pair 128-8 seems a bit better.
+ static constexpr int block_size = 128;
+ static constexpr int min_block_per_sm = 8;
+
+ // The number of threads for processing one input row is at least one warp.
+ auto const num_threads_per_row =
+ cudf::util::div_rounding_up_safe(path_data.size(),
+ static_cast(cudf::detail::warp_size)) *
+ cudf::detail::warp_size;
+ auto const num_blocks = cudf::util::div_rounding_up_safe(num_threads_per_row * input.size(),
+ static_cast(block_size));
+ get_json_object_kernel
+ <<>>(input, path_data, num_threads_per_row);
+ }
+};
+
+/**
+ * @brief Construct the device vector containing necessary data for the input JSON paths.
*
- * This kernel operates in a 2-pass way. On the first pass it computes the
- * output sizes. On the second pass, it fills in the provided output buffers
- * (chars and validity).
+ * All JSON paths are processed at once, without stream synchronization, to minimize overhead.
*
- * @param col Device view of the incoming string
- * @param path_commands JSONPath command buffer
- * @param d_sizes a buffer used to write the output sizes in the first pass,
- * and is read back in on the second pass to compute offsets.
- * @param output_offsets Buffer used to store the string offsets for the results
- * of the query
- * @param out_buf Buffer used to store the results of the query
- * @param out_validity Output validity buffer
- * @param out_valid_count Output count of # of valid bits
+ * A tuple of values are returned, however, only the first element is needed for further kernel
+ * launch. The remaining are unused but need to be kept alive as they contains data for later
+ * asynchronous host-device memcpy.
*/
-template
-// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from
-// the kernel itself. By default NVCC uses a heuristic to find a balance between the
-// maximum number of registers used by a kernel and the parallelism of the kernel.
-// If lots of registers are used the parallelism may suffer. But in our case
-// NVCC gets this wrong and we want to avoid spilling all the time or else
-// the performance is really bad. This essentially tells NVCC to prefer using lots
-// of registers over spilling.
-__launch_bounds__(block_size, 1) CUDF_KERNEL
- void get_json_object_kernel(cudf::column_device_view col,
- cudf::device_span path_commands,
- cudf::size_type* d_sizes,
- cudf::detail::input_offsetalator output_offsets,
- char* out_buf,
- cudf::bitmask_type* out_validity,
- cudf::size_type* out_valid_count)
+std::tuple>,
+ std::unique_ptr>>,
+ cudf::string_scalar,
+ std::string>
+construct_path_commands(
+ std::vector>> const&
+ json_paths,
+ rmm::cuda_stream_view stream)
{
- auto tid = cudf::detail::grid_1d::global_thread_id();
- auto const stride = cudf::detail::grid_1d::grid_stride();
-
- cudf::size_type warp_valid_count{0};
-
- auto active_threads = __ballot_sync(0xffff'ffffu, tid < col.size());
- while (tid < col.size()) {
- bool is_valid = false;
- cudf::string_view const str = col.element(tid);
- if (str.size_bytes() > 0) {
- char* dst = out_buf != nullptr ? out_buf + output_offsets[tid] : nullptr;
- size_t const dst_size =
- out_buf != nullptr ? output_offsets[tid + 1] - output_offsets[tid] : 0;
-
- // process one single row
- auto [result, output_size] =
- get_json_object_single(str, {path_commands.data(), path_commands.size()}, dst, dst_size);
- if (result) { is_valid = true; }
-
- // filled in only during the precompute step. during the compute step, the
- // offsets are fed back in so we do -not- want to write them out
- if (out_buf == nullptr) { d_sizes[tid] = static_cast(output_size); }
- } else {
- // valid JSON length is always greater than 0
- // if `str` size len is zero, output len is 0 and `is_valid` is false
- if (out_buf == nullptr) { d_sizes[tid] = 0; }
+ // Concatenate all names from path instructions.
+ auto h_inst_names = [&] {
+ std::size_t length{0};
+ for (auto const& instructions : json_paths) {
+ for (auto const& [type, name, index] : instructions) {
+ if (type == path_instruction_type::NAMED) { length += name.length(); }
+ }
}
-
- // validity filled in only during the output step
- if (out_validity != nullptr) {
- uint32_t mask = __ballot_sync(active_threads, is_valid);
- // 0th lane of the warp writes the validity
- if (!(tid % cudf::detail::warp_size)) {
- out_validity[cudf::word_index(tid)] = mask;
- warp_valid_count += __popc(mask);
+ std::string all_names;
+ all_names.reserve(length);
+ for (auto const& instructions : json_paths) {
+ for (auto const& [type, name, index] : instructions) {
+ if (type == path_instruction_type::NAMED) { all_names += name; }
}
}
+ return all_names;
+ }();
+ auto d_inst_names = cudf::string_scalar(h_inst_names, true, stream);
- tid += stride;
- active_threads = __ballot_sync(active_threads, tid < col.size());
+ std::size_t name_pos{0};
+ auto h_path_commands = std::make_unique>>();
+ h_path_commands->reserve(json_paths.size());
+
+ for (auto const& instructions : json_paths) {
+ h_path_commands->emplace_back();
+ auto& path_commands = h_path_commands->back();
+ path_commands.reserve(instructions.size());
+
+ for (auto const& [type, name, index] : instructions) {
+ path_commands.emplace_back(path_instruction{type});
+
+ if (type == path_instruction_type::INDEX) {
+ path_commands.back().index = index;
+ } else if (type == path_instruction_type::NAMED) {
+ path_commands.back().name = cudf::string_view(d_inst_names.data() + name_pos, name.size());
+ name_pos += name.size();
+ } else if (type != path_instruction_type::WILDCARD) {
+ CUDF_FAIL("Invalid path instruction type");
+ }
+ }
}
- // sum the valid counts across the whole block
- if (out_valid_count != nullptr) {
- cudf::size_type block_valid_count =
- cudf::detail::single_lane_block_sum_reduce(warp_valid_count);
- if (threadIdx.x == 0) { atomicAdd(out_valid_count, block_valid_count); }
+ auto d_path_commands = std::vector>{};
+ d_path_commands.reserve(h_path_commands->size());
+ for (auto const& path_commands : *h_path_commands) {
+ d_path_commands.emplace_back(cudf::detail::make_device_uvector_async(
+ path_commands, stream, rmm::mr::get_current_device_resource()));
}
+
+ return {std::move(d_path_commands),
+ std::move(h_path_commands),
+ std::move(d_inst_names),
+ std::move(h_inst_names)};
}
-std::unique_ptr get_json_object(
+std::vector> get_json_object(
cudf::strings_column_view const& input,
- std::vector> const& instructions,
+ std::vector>> const&
+ json_paths,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
- if (input.is_empty()) return cudf::make_empty_column(cudf::type_id::STRING);
+ auto const num_outputs = json_paths.size();
+ std::vector> output;
+
+ // Input is empty or all nulls - just return all null columns.
+ if (input.is_empty() || input.size() == input.null_count()) {
+ for (std::size_t idx = 0; idx < num_outputs; ++idx) {
+ output.emplace_back(std::make_unique(input.parent(), stream, mr));
+ }
+ return output;
+ }
+
+ auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
+ auto const in_offsets =
+ cudf::detail::offsetalator_factory::make_input_iterator(input.offsets(), input.offset());
+ auto const [d_json_paths, h_json_paths, d_inst_names, h_inst_names] =
+ construct_path_commands(json_paths, stream);
+
+ auto const max_row_size = thrust::transform_reduce(
+ rmm::exec_policy(stream),
+ thrust::make_counting_iterator(0),
+ thrust::make_counting_iterator(input.size()),
+ cuda::proclaim_return_type(
+ [in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }),
+ int64_t{0},
+ thrust::maximum{});
+
+ // We will use scratch buffers to store the output strings without knowing their sizes.
+ // Since we do not know their sizes, we need to allocate the buffer a bit larger than the input
+ // size so that we will not write output strings into an out-of-bound position.
+ // Checking out-of-bound needs to be performed in the main kernel to make sure we will not have
+ // data corruption.
+ auto const scratch_size = [&, max_row_size = max_row_size] {
+ // Pad the scratch buffer by an additional size that is a multiple of max row size.
+ auto constexpr padding_rows = 10;
+ return input.chars_size(stream) + max_row_size * padding_rows;
+ }();
+
+ rmm::device_uvector d_has_out_of_bound(num_outputs, stream);
+ std::vector> scratch_buffers;
+ std::vector>> out_stringviews;
+ std::vector h_path_data;
+ scratch_buffers.reserve(json_paths.size());
+ out_stringviews.reserve(json_paths.size());
+ h_path_data.reserve(json_paths.size());
+
+ for (std::size_t idx = 0; idx < num_outputs; ++idx) {
+ auto const& instructions = json_paths[idx];
+ if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); }
+
+ scratch_buffers.emplace_back(rmm::device_uvector(scratch_size, stream));
+ out_stringviews.emplace_back(rmm::device_uvector>{
+ static_cast(input.size()), stream});
+
+ h_path_data.emplace_back(json_path_processing_data{d_json_paths[idx],
+ in_offsets,
+ out_stringviews.back().data(),
+ scratch_buffers.back().data(),
+ d_has_out_of_bound.data() + idx});
+ }
+ auto d_path_data = cudf::detail::make_device_uvector_async(
+ h_path_data, stream, rmm::mr::get_current_device_resource());
+ thrust::uninitialized_fill(
+ rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0);
+ kernel_launcher::exec(*d_input_ptr, d_path_data, stream);
+
+ // Do not use parallel check since we do not have many elements.
+ auto h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream);
+ auto has_no_oob = std::none_of(
+ h_has_out_of_bound.begin(), h_has_out_of_bound.end(), [](auto const val) { return val != 0; });
+
+ // If we didn't see any out-of-bound write, everything is good so far.
+ // Just gather the output strings and return.
+ if (has_no_oob) {
+ for (auto const& out_sview : out_stringviews) {
+ output.emplace_back(cudf::make_strings_column(out_sview, stream, mr));
+ }
+ return output;
+ }
+ // From here, we had out-of-bound write. Although this is very rare, it may still happen.
+
+ std::vector> out_null_masks_and_null_counts;
+ std::vector, int64_t>> out_offsets_and_sizes;
+ std::vector> out_char_buffers;
+ std::vector oob_indices;
- if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); }
+ // Check validity from the stored char pointers.
+ auto const validator = [] __device__(thrust::pair const item) {
+ return item.first != nullptr;
+ };
- // get a string buffer to store all the names and convert to device
- std::string all_names;
- for (auto const& inst : instructions) {
- all_names += std::get<1>(inst);
+ // Rebuild the data only for paths that had out of bound write.
+ h_path_data.clear();
+ for (std::size_t idx = 0; idx < num_outputs; ++idx) {
+ auto const& out_sview = out_stringviews[idx];
+
+ if (h_has_out_of_bound[idx]) {
+ oob_indices.emplace_back(idx);
+ output.emplace_back(nullptr); // just placeholder.
+
+ out_null_masks_and_null_counts.emplace_back(
+ cudf::detail::valid_if(out_sview.begin(), out_sview.end(), validator, stream, mr));
+
+ // The string sizes computed in the previous kernel call will be used to allocate a new char
+ // buffer to store the output.
+ auto const size_it = cudf::detail::make_counting_transform_iterator(
+ 0,
+ cuda::proclaim_return_type(
+ [string_pairs = out_sview.data()] __device__(auto const idx) {
+ return string_pairs[idx].second;
+ }));
+ out_offsets_and_sizes.emplace_back(cudf::strings::detail::make_offsets_child_column(
+ size_it, size_it + input.size(), stream, mr));
+ out_char_buffers.emplace_back(
+ rmm::device_uvector(out_offsets_and_sizes.back().second, stream, mr));
+
+ h_path_data.emplace_back(
+ json_path_processing_data{d_json_paths[idx],
+ cudf::detail::offsetalator_factory::make_input_iterator(
+ out_offsets_and_sizes.back().first->view()),
+ nullptr /*out_stringviews*/,
+ out_char_buffers.back().data(),
+ d_has_out_of_bound.data() + idx});
+ } else {
+ output.emplace_back(cudf::make_strings_column(out_sview, stream, mr));
+ }
+ }
+ // These buffers are no longer needed.
+ scratch_buffers.clear();
+ out_stringviews.clear();
+
+ // Push data to the GPU and launch the kernel again.
+ d_path_data = cudf::detail::make_device_uvector_async(
+ h_path_data, stream, rmm::mr::get_current_device_resource());
+ thrust::uninitialized_fill(
+ rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0);
+ kernel_launcher::exec(*d_input_ptr, d_path_data, stream);
+
+ // Check out of bound again to make sure everything looks right.
+ h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream);
+ has_no_oob = std::none_of(
+ h_has_out_of_bound.begin(), h_has_out_of_bound.end(), [](auto const val) { return val != 0; });
+
+ // The last kernel call should not encounter any out-of-bound write.
+ // If OOB is still detected, there must be something wrong happened.
+ CUDF_EXPECTS(has_no_oob, "Unexpected out-of-bound write in get_json_object kernel.");
+
+ for (std::size_t idx = 0; idx < oob_indices.size(); ++idx) {
+ auto const out_idx = oob_indices[idx];
+ output[out_idx] =
+ cudf::make_strings_column(input.size(),
+ std::move(out_offsets_and_sizes[idx].first),
+ out_char_buffers[idx].release(),
+ out_null_masks_and_null_counts[idx].second,
+ std::move(out_null_masks_and_null_counts[idx].first));
}
- cudf::string_scalar all_names_scalar(all_names, true, stream);
- // parse the json_path into a command buffer
- auto path_commands = construct_path_commands(
- instructions, all_names_scalar, stream, rmm::mr::get_current_device_resource());
-
- // compute output sizes
- auto sizes = rmm::device_uvector(
- input.size(), stream, rmm::mr::get_current_device_resource());
- auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());
-
- constexpr int block_size = 512;
- cudf::detail::grid_1d const grid{input.size(), block_size};
- auto d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
- // preprocess sizes (returned in the offsets buffer)
- get_json_object_kernel
- <<>>(
- *d_input_ptr, path_commands, sizes.data(), d_offsets, nullptr, nullptr, nullptr);
-
- // convert sizes to offsets
- auto [offsets, output_size] =
- cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr);
- d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());
-
- // allocate output string column
- rmm::device_uvector chars(output_size, stream, mr);
-
- // potential optimization : if we know that all outputs are valid, we could
- // skip creating the validity mask altogether
- rmm::device_buffer validity =
- cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr);
-
- // compute results
- rmm::device_scalar d_valid_count{0, stream};
-
- get_json_object_kernel
- <<>>(
- *d_input_ptr,
- path_commands,
- sizes.data(),
- d_offsets,
- chars.data(),
- static_cast(validity.data()),
- d_valid_count.data());
-
- return make_strings_column(input.size(),
- std::move(offsets),
- chars.release(),
- input.size() - d_valid_count.value(stream),
- std::move(validity));
+ return output;
}
} // namespace detail
@@ -1021,7 +1124,19 @@ std::unique_ptr get_json_object(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
- return detail::get_json_object(input, instructions, stream, mr);
+ CUDF_FUNC_RANGE();
+ return std::move(detail::get_json_object(input, {instructions}, stream, mr).front());
+}
+
+std::vector> get_json_object_multiple_paths(
+ cudf::strings_column_view const& input,
+ std::vector>> const&
+ json_paths,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr)
+{
+ CUDF_FUNC_RANGE();
+ return detail::get_json_object(input, json_paths, stream, mr);
}
} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/get_json_object.hpp b/src/main/cpp/src/get_json_object.hpp
index bb3294b424..963bc91a74 100644
--- a/src/main/cpp/src/get_json_object.hpp
+++ b/src/main/cpp/src/get_json_object.hpp
@@ -16,31 +16,25 @@
#pragma once
-#include
#include
#include
-#include
-#include
-#include
-#include
-
#include
-#include
#include
namespace spark_rapids_jni {
/**
- * path instruction type
+ * @brief Type of instruction in a JSON path.
*/
-enum class path_instruction_type { WILDCARD, INDEX, NAMED };
+enum class path_instruction_type : int8_t { WILDCARD, INDEX, NAMED };
/**
- * Extracts json object from a json string based on json path specified, and
- * returns json string of the extracted json object. It will return null if the
- * input json string is invalid.
+ * @brief Extract JSON object from a JSON string based on the specified JSON path.
+ *
+ * If the input JSON string is invalid, or it does not contain the object at the given path, a null
+ * will be returned.
*/
std::unique_ptr get_json_object(
cudf::strings_column_view const& input,
@@ -48,4 +42,18 @@ std::unique_ptr get_json_object(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+/**
+ * @brief Extract multiple JSON objects from a JSON string based on the specified JSON paths.
+ *
+ * This function processes all the JSON paths in parallel, which may be faster than calling
+ * to `get_json_object` on the individual JSON paths. However, it may consume much more GPU
+ * memory, proportional to the number of JSON paths.
+ */
+std::vector> get_json_object_multiple_paths(
+ cudf::strings_column_view const& input,
+ std::vector>> const&
+ json_paths,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/hash.cuh b/src/main/cpp/src/hash.cuh
index 8cf489a7e7..35969ca42a 100644
--- a/src/main/cpp/src/hash.cuh
+++ b/src/main/cpp/src/hash.cuh
@@ -19,15 +19,9 @@
#include
#include
-#include
-#include
-
#include
namespace spark_rapids_jni {
-
-constexpr int64_t DEFAULT_XXHASH64_SEED = 42;
-
/**
* Normalization of floating point NaNs, passthrough for all other values.
*/
@@ -101,37 +95,4 @@ __device__ __inline__ std::pair<__int128_t, cudf::size_type> to_java_bigdecimal(
return {big_endian_value, length};
}
-
-/**
- * @brief Computes the murmur32 hash value of each row in the input set of columns.
- *
- * @param input The table of columns to hash
- * @param seed Optional seed value to use for the hash function
- * @param stream CUDA stream used for device memory operations and kernel launches
- * @param mr Device memory resource used to allocate the returned column's device memory
- *
- * @returns A column where each row is the hash of a column from the input.
- */
-std::unique_ptr murmur_hash3_32(
- cudf::table_view const& input,
- uint32_t seed = 0,
- rmm::cuda_stream_view stream = cudf::get_default_stream(),
- rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
-
-/**
- * @brief Computes the xxhash64 hash value of each row in the input set of columns.
- *
- * @param input The table of columns to hash
- * @param seed Optional seed value to use for the hash function
- * @param stream CUDA stream used for device memory operations and kernel launches
- * @param mr Device memory resource used to allocate the returned column's device memory
- *
- * @returns A column where each row is the hash of a column from the input.
- */
-std::unique_ptr xxhash64(
- cudf::table_view const& input,
- int64_t seed = DEFAULT_XXHASH64_SEED,
- rmm::cuda_stream_view stream = cudf::get_default_stream(),
- rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
-
} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/hash.hpp b/src/main/cpp/src/hash.hpp
new file mode 100644
index 0000000000..4021b9e75c
--- /dev/null
+++ b/src/main/cpp/src/hash.hpp
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+
+#include
+#include
+
+namespace spark_rapids_jni {
+
+constexpr int64_t DEFAULT_XXHASH64_SEED = 42;
+
+/**
+ * @brief Computes the murmur32 hash value of each row in the input set of columns.
+ *
+ * @param input The table of columns to hash
+ * @param seed Optional seed value to use for the hash function
+ * @param stream CUDA stream used for device memory operations and kernel launches
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns A column where each row is the hash of a column from the input.
+ */
+std::unique_ptr murmur_hash3_32(
+ cudf::table_view const& input,
+ uint32_t seed = 0,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
+/**
+ * @brief Computes the xxhash64 hash value of each row in the input set of columns.
+ *
+ * @param input The table of columns to hash
+ * @param seed Optional seed value to use for the hash function
+ * @param stream CUDA stream used for device memory operations and kernel launches
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns A column where each row is the hash of a column from the input.
+ */
+std::unique_ptr xxhash64(
+ cudf::table_view const& input,
+ int64_t seed = DEFAULT_XXHASH64_SEED,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
+/**
+ * @brief Computes the Hive hash value of each row in the input set of columns.
+ *
+ * @param input The table of columns to hash
+ * @param stream CUDA stream used for device memory operations and kernel launches
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns A column where each row is the hash of a column from the input.
+ */
+std::unique_ptr hive_hash(
+ cudf::table_view const& input,
+ rmm::cuda_stream_view stream = cudf::get_default_stream(),
+ rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
+
+} // namespace spark_rapids_jni
diff --git a/src/main/cpp/src/hive_hash.cu b/src/main/cpp/src/hive_hash.cu
new file mode 100644
index 0000000000..85598565a9
--- /dev/null
+++ b/src/main/cpp/src/hive_hash.cu
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hash.cuh"
+
+#include
+#include
+#include
+#include
+
+#include