diff --git a/qa/L0_decoupled/test.sh b/qa/L0_decoupled/test.sh index 22c37dff49..20ca0fffa3 100755 --- a/qa/L0_decoupled/test.sh +++ b/qa/L0_decoupled/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -127,6 +127,45 @@ for trial in $TRIALS; do kill $SERVER_PID wait $SERVER_PID + + SERVER_ARGS="--model-repository=$MODELDIR --grpc-max-response-pool-size=1" + SERVER_LOG="grpc_max_response_pool_size_1_${trial}_server.log" + CLIENT_LOG="grpc_max_response_pool_size_1_${trial}_client.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi + + for test in \ + test_one_to_none \ + test_one_to_one \ + test_one_to_many \ + test_no_streaming \ + test_response_order \ + test_wrong_shape; do + + echo "Test: $test" >>$CLIENT_LOG + set +e + python $DECOUPLED_TEST DecoupledTest.$test >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Test grpc-max-response-pool-size=1 ${trial} - $test Failed\n***" >>$CLIENT_LOG + echo -e "\n***\n*** Test grpc-max-response-pool-size=1 ${trial} - $test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + set -e + done + + kill $SERVER_PID + wait $SERVER_PID done # Test the server frontend can merge the responses of non-decoupled model that diff --git a/src/command_line_parser.cc b/src/command_line_parser.cc index 53a103d33b..0c113e38a9 100644 --- a/src/command_line_parser.cc +++ b/src/command_line_parser.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -306,6 +306,7 @@ enum TritonOptionId { OPTION_GRPC_ADDRESS, OPTION_GRPC_HEADER_FORWARD_PATTERN, OPTION_GRPC_INFER_ALLOCATION_POOL_SIZE, + OPTION_GRPC_MAX_RESPONSE_POOL_SIZE, OPTION_GRPC_USE_SSL, OPTION_GRPC_USE_SSL_MUTUAL, OPTION_GRPC_SERVER_CERT, @@ -536,6 +537,11 @@ TritonParser::SetupOptions() "allocated for reuse. As long as the number of in-flight requests " "doesn't exceed this value there will be no allocation/deallocation of " "request/response objects."}); + grpc_options_.push_back( + {OPTION_GRPC_MAX_RESPONSE_POOL_SIZE, "grpc-max-response-pool-size", + Option::ArgInt, + "The maximum number of inference response objects that can remain " + "allocated in the pool at any given time."}); grpc_options_.push_back( {OPTION_GRPC_USE_SSL, "grpc-use-ssl", Option::ArgBool, "Use SSL authentication for GRPC requests. Default is false."}); @@ -1438,6 +1444,14 @@ TritonParser::Parse(int argc, char** argv) case OPTION_GRPC_INFER_ALLOCATION_POOL_SIZE: lgrpc_options.infer_allocation_pool_size_ = ParseOption(optarg); break; + case OPTION_GRPC_MAX_RESPONSE_POOL_SIZE: + lgrpc_options.max_response_pool_size_ = ParseOption(optarg); + if (lgrpc_options.max_response_pool_size_ <= 0) { + throw ParseException( + "Error: --grpc-max-response-pool-size must be greater " + "than 0."); + } + break; case OPTION_GRPC_USE_SSL: lgrpc_options.ssl_.use_ssl_ = ParseOption(optarg); break; diff --git a/src/grpc/grpc_server.cc b/src/grpc/grpc_server.cc index 74ec443ae6..5beb3aba72 100644 --- a/src/grpc/grpc_server.cc +++ b/src/grpc/grpc_server.cc @@ -1,4 +1,4 @@ -// Copyright 2019-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -2395,8 +2395,8 @@ Server::Server( "ModelInferHandler", tritonserver_, trace_manager_, shm_manager_, &service_, model_infer_cq_.get(), options.infer_allocation_pool_size_ /* max_state_bucket_count */, - options.infer_compression_level_, restricted_kv, - options.forward_header_pattern_)); + options.max_response_pool_size_, options.infer_compression_level_, + restricted_kv, options.forward_header_pattern_)); } // Handler for streaming inference requests. Keeps one handler for streaming @@ -2405,8 +2405,8 @@ Server::Server( "ModelStreamInferHandler", tritonserver_, trace_manager_, shm_manager_, &service_, model_stream_infer_cq_.get(), options.infer_allocation_pool_size_ /* max_state_bucket_count */, - options.infer_compression_level_, restricted_kv, - options.forward_header_pattern_)); + options.max_response_pool_size_, options.infer_compression_level_, + restricted_kv, options.forward_header_pattern_)); } Server::~Server() @@ -2472,6 +2472,8 @@ Server::GetOptions(Options& options, UnorderedMapType& options_map) RETURN_IF_ERR(GetValue( options_map, "infer_allocation_pool_size", &options.infer_allocation_pool_size_)); + RETURN_IF_ERR(GetValue( + options_map, "max_response_pool_size", &options.max_response_pool_size_)); RETURN_IF_ERR(GetValue( options_map, "forward_header_pattern", &options.forward_header_pattern_)); diff --git a/src/grpc/grpc_server.h b/src/grpc/grpc_server.h index 89d8dc7388..f5ec5f87cd 100644 --- a/src/grpc/grpc_server.h +++ b/src/grpc/grpc_server.h @@ -1,4 +1,4 @@ -// Copyright 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -89,6 +89,7 @@ struct Options { // requests doesn't exceed this value there will be no // allocation/deallocation of request/response objects. int infer_allocation_pool_size_{8}; + int max_response_pool_size_{INT_MAX}; RestrictedFeatures restricted_protocols_; std::string forward_header_pattern_; }; diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 86428a514e..ef259a5d55 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -102,30 +102,44 @@ struct RequestReleasePayload final { // // ResponseQueue // -// A simple queue holding the responses to be written. Uses a -// vector of persistent message objects to prevent allocating -// memory for each response to be written. +// This class implements a queue to manage responses that need to be written. +// It internally uses a reusable pool of persistent message objects to avoid +// allocating memory for each response individually. // template class ResponseQueue { public: - explicit ResponseQueue() { Reset(); } + explicit ResponseQueue(const size_t max_response_queue_size) + : max_response_queue_size_(max_response_queue_size) + { + Reset(); + } ~ResponseQueue() { + // Delete all responses in the reusable pool + for (auto response : reusable_pool_) { + delete response; + } + + // Delete all responses currently in the queue for (auto response : responses_) { delete response; } } - // Resets the queue + // Resets the queue to its initial state void Reset() { + std::lock_guard lock(mtx_); alloc_count_ = 0; ready_count_ = 0; - current_index_ = 0; - for (auto response : responses_) { - response->Clear(); + pop_count_ = 0; + + while (!responses_.empty()) { + responses_.front()->Clear(); + reusable_pool_.push_back(responses_.front()); + responses_.pop_front(); } } @@ -137,17 +151,29 @@ class ResponseQueue { std::lock_guard lock(mtx_); alloc_count_ = 1; if (responses_.size() < 1) { - responses_.push_back(new ResponseType()); + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { + responses_.push_back(new ResponseType()); + } } return responses_[0]; } - // Allocates a response on the head of the queue + // Allocates a response at the end of the queue void AllocateResponse() { - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); + cv_.wait( + lock, [this] { return responses_.size() < max_response_queue_size_; }); alloc_count_++; - if (responses_.size() < alloc_count_) { + + // Use a response from the reusable pool if available + if (!reusable_pool_.empty()) { + responses_.push_back(reusable_pool_.front()); + reusable_pool_.pop_front(); + } else { responses_.push_back(new ResponseType()); } } @@ -156,12 +182,15 @@ class ResponseQueue { ResponseType* GetLastAllocatedResponse() { std::lock_guard lock(mtx_); - if (responses_.size() < alloc_count_) { + + // Ensure that the requested response has been allocated + if ((responses_.size() + pop_count_) < alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access the response not yet allocated"; return nullptr; } - return responses_[alloc_count_ - 1]; + + return responses_.back(); } // Marks the next non-ready response complete @@ -178,43 +207,73 @@ class ResponseQueue { return true; } - // Gets the current response from the tail of - // the queue. + // Gets the current response from the front of the queue ResponseType* GetCurrentResponse() { std::lock_guard lock(mtx_); - if (current_index_ >= ready_count_) { + if (pop_count_ >= ready_count_) { LOG_ERROR << "[INTERNAL] Attempting to access current response when it " "is not ready"; return nullptr; } - return responses_[current_index_]; + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses are available in the queue."; + return nullptr; + } + + return responses_.front(); } // Gets the response at the specified index ResponseType* GetResponseAt(const uint32_t index) { std::lock_guard lock(mtx_); + + // Check if the index is valid for allocated responses if (index >= alloc_count_) { LOG_ERROR << "[INTERNAL] Attempting to access response which is not yet " "allocated"; return nullptr; } - return responses_[index]; + if (index < pop_count_) { + LOG_ERROR << "[INTERNAL] Attempting to access a response that has " + "already been removed from the queue."; + return nullptr; + } + + // Adjust index based on number of popped responses to get actual index in + // 'responses_' + return responses_[index - pop_count_]; } - // Pops the response from the tail of the queue + // Removes the current response from the front of the queue void PopResponse() { std::lock_guard lock(mtx_); - current_index_++; + + // Ensure there are responses in the queue to pop + if (responses_.empty()) { + LOG_ERROR << "[INTERNAL] No responses in the queue to pop."; + return; + } + + // Clear and move the current response to the reusable pool + auto response = responses_.front(); + response->Clear(); + reusable_pool_.push_back(response); + responses_.pop_front(); + pop_count_++; + + cv_.notify_one(); } // Returns whether the queue is empty bool IsEmpty() { std::lock_guard lock(mtx_); - return ((alloc_count_ == ready_count_) && (alloc_count_ == current_index_)); + return ( + (alloc_count_ == ready_count_) && (alloc_count_ == pop_count_) && + responses_.empty()); } // Returns whether the queue has responses @@ -222,20 +281,23 @@ class ResponseQueue { bool HasReadyResponse() { std::lock_guard lock(mtx_); - return (ready_count_ > current_index_); + return (ready_count_ > pop_count_); } private: - std::vector responses_; + // Stores responses that need to be written. The front of the queue indicates + // the current response, while the back indicates the last allocated response. + std::deque responses_; + // Stores completed responses that can be reused + std::deque reusable_pool_; + std::condition_variable cv_; + size_t max_response_queue_size_; std::mutex mtx_; - // There are three indices to track the responses in the queue - // Tracks the allocated response - uint32_t alloc_count_; - // Tracks the response that is ready to be written - uint32_t ready_count_; - // Tracks the response next in the queue to be written - uint32_t current_index_; + // Three counters are used to track and manage responses in the queue + uint32_t alloc_count_; // Number of allocated responses + uint32_t ready_count_; // Number of ready-to-write responses + uint32_t pop_count_; // Number of removed responses from the queue }; @@ -1070,7 +1132,7 @@ class InferHandlerState { } explicit InferHandlerState( - TRITONSERVER_Server* tritonserver, + TRITONSERVER_Server* tritonserver, const size_t max_response_queue_size, const std::shared_ptr& context, Steps start_step = Steps::START) : tritonserver_(tritonserver), async_notify_state_(false) { @@ -1084,7 +1146,8 @@ class InferHandlerState { delay_response_completion_ms_ = ParseDebugVariable("TRITONSERVER_DELAY_RESPONSE_COMPLETION"); - response_queue_.reset(new ResponseQueue()); + response_queue_.reset( + new ResponseQueue(max_response_queue_size)); Reset(context, start_step); } @@ -1237,7 +1300,7 @@ class InferHandler : public HandlerBase { const std::string& name, const std::shared_ptr& tritonserver, ServiceType* service, ::grpc::ServerCompletionQueue* cq, - size_t max_state_bucket_count, + size_t max_state_bucket_count, size_t max_response_queue_size, std::pair restricted_kv, const std::string& header_forward_pattern); virtual ~InferHandler(); @@ -1274,7 +1337,8 @@ class InferHandler : public HandlerBase { } if (state == nullptr) { - state = new State(tritonserver, context, start_step); + state = new State( + tritonserver, max_response_queue_size_, context, start_step); } if (start_step == Steps::START) { @@ -1375,6 +1439,7 @@ class InferHandler : public HandlerBase { const size_t max_state_bucket_count_; std::vector state_bucket_; + const size_t max_response_queue_size_; std::pair restricted_kv_; std::string header_forward_pattern_; re2::RE2 header_forward_regex_; @@ -1388,11 +1453,12 @@ InferHandler:: const std::string& name, const std::shared_ptr& tritonserver, ServiceType* service, ::grpc::ServerCompletionQueue* cq, - size_t max_state_bucket_count, + size_t max_state_bucket_count, size_t max_response_queue_size, std::pair restricted_kv, const std::string& header_forward_pattern) : name_(name), tritonserver_(tritonserver), service_(service), cq_(cq), max_state_bucket_count_(max_state_bucket_count), + max_response_queue_size_(max_response_queue_size), restricted_kv_(restricted_kv), header_forward_pattern_(header_forward_pattern), header_forward_regex_(header_forward_pattern_) @@ -1548,12 +1614,12 @@ class ModelInferHandler const std::shared_ptr& shm_manager, inference::GRPCInferenceService::AsyncService* service, ::grpc::ServerCompletionQueue* cq, size_t max_state_bucket_count, - grpc_compression_level compression_level, + size_t max_response_queue_size, grpc_compression_level compression_level, std::pair restricted_kv, const std::string& forward_header_pattern) : InferHandler( name, tritonserver, service, cq, max_state_bucket_count, - restricted_kv, forward_header_pattern), + max_response_queue_size, restricted_kv, forward_header_pattern), trace_manager_(trace_manager), shm_manager_(shm_manager), compression_level_(compression_level) { diff --git a/src/grpc/stream_infer_handler.h b/src/grpc/stream_infer_handler.h index 40a8346703..75c51b0ec7 100644 --- a/src/grpc/stream_infer_handler.h +++ b/src/grpc/stream_infer_handler.h @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -71,12 +71,12 @@ class ModelStreamInferHandler const std::shared_ptr& shm_manager, inference::GRPCInferenceService::AsyncService* service, ::grpc::ServerCompletionQueue* cq, size_t max_state_bucket_count, - grpc_compression_level compression_level, + size_t max_response_queue_size, grpc_compression_level compression_level, std::pair restricted_kv, const std::string& header_forward_pattern) : InferHandler( name, tritonserver, service, cq, max_state_bucket_count, - restricted_kv, header_forward_pattern), + max_response_queue_size, restricted_kv, header_forward_pattern), trace_manager_(trace_manager), shm_manager_(shm_manager), compression_level_(compression_level) {