From 88f691ccc0f5ca6c12fcd51ccde53d9234b1ccb0 Mon Sep 17 00:00:00 2001 From: Victor Chang Date: Wed, 11 Sep 2024 22:46:45 +0000 Subject: [PATCH] async condition Signed-off-by: Victor Chang --- .../cpp/app_cloud.hpp | 4 +- .../cpp/entity_server.hpp | 7 +- .../cpp/grpc_ops.hpp | 7 +- .../cpp/main.cpp | 6 + .../cpp/resource_queue.hpp | 18 +-- operators/grpc_operators/tensor_proto.cpp | 139 ++++++++---------- operators/grpc_operators/tensor_proto.hpp | 13 ++ 7 files changed, 98 insertions(+), 96 deletions(-) diff --git a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/app_cloud.hpp b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/app_cloud.hpp index e22c0b8c3..c389265e2 100644 --- a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/app_cloud.hpp +++ b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/app_cloud.hpp @@ -40,7 +40,8 @@ class AppCloud : public AppBase { void compose() override { using namespace holoscan; - request_queue_ = make_resource("request_queue"); + auto request_available_condition = make_condition("request_available_condition"); + request_queue_ = make_resource("request_queue", request_available_condition); processing_queue_ = make_resource("processing_queue"); response_queue_ = make_resource("response_queue"); @@ -48,6 +49,7 @@ class AppCloud : public AppBase { make_operator("grpc_request_op", Arg("request_queue") = request_queue_, Arg("processing_queue") = processing_queue_, + Arg("condition") = request_available_condition, Arg("allocator") = make_resource("pool")); auto response_condition = make_condition("response_condition"); diff --git a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/entity_server.hpp b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/entity_server.hpp index 83d3e3b00..804a8deee 100644 --- a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/entity_server.hpp +++ b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/entity_server.hpp @@ -55,12 +55,11 @@ class HoloscanEntityServiceImpl final : public Entity::Service { auto service = request->service(); if (service == "endoscopy_tool_tracking") { - ; std::string request_id = generate_request_id(); request_queue_->push(request_id, request); - response_queue_->block_until_data_available(); - auto response = response_queue_->pop(); - holoscan::ops::TensorProto::tensor_to_entity_response(response, reply); + // response_queue_->block_until_data_available(); + // auto response = response_queue_->pop(); + // holoscan::ops::TensorProto::tensor_to_entity_response(response, reply); return Status::OK; } diff --git a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/grpc_ops.hpp b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/grpc_ops.hpp index 64ecf6b71..8cb003713 100644 --- a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/grpc_ops.hpp +++ b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/grpc_ops.hpp @@ -81,25 +81,30 @@ class GrpcRequestOp : public holoscan::Operator { GrpcRequestOp() = default; + void start() override { condition_->event_state(AsynchronousEventState::EVENT_WAITING); } + void stop() override { condition_->event_state(AsynchronousEventState::EVENT_NEVER); } + void setup(OperatorSpec& spec) override { spec.param(request_queue_, "request_queue", "Request Queue", "Incoming gRPC requests."); spec.param(processing_queue_, "processing_queue", "Processing Queue", "In processing queue."); spec.param(allocator_, "allocator", "Allocator", "Output Allocator"); + spec.param(condition_, "condition", "Asynchronous Condition", "Asynchronous Condition"); spec.output("out"); } void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) override { - request_queue_->block_until_data_available(); auto request = request_queue_->pop(); processing_queue_->push(std::get<0>(request)); auto tensor = holoscan::ops::TensorProto::entity_request_to_tensor( std::get<1>(request), context, allocator_.get()); op_output.emit(tensor, "out"); + condition_->event_state(AsynchronousEventState::EVENT_WAITING); } private: + Parameter> condition_; Parameter> request_queue_; Parameter> processing_queue_; Parameter> allocator_; diff --git a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/main.cpp b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/main.cpp index 31525aa21..18787f9a1 100644 --- a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/main.cpp +++ b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/main.cpp @@ -109,6 +109,12 @@ int main(int argc, char** argv) { HOLOSCAN_LOG_INFO("Using input data from {}", data_directory); app->set_datapath(data_directory); + auto scheduler = app->make_scheduler( + "event-scheduler", + Arg("worker_thread_number", 3L), + Arg("stop_on_deadlock", false) + ); + app->scheduler(scheduler); app->run(); return 0; diff --git a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/resource_queue.hpp b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/resource_queue.hpp index 112536a1e..a914fca31 100644 --- a/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/resource_queue.hpp +++ b/applications/h264/grpc_h264_endoscopy_tool_tracking/cpp/resource_queue.hpp @@ -36,19 +36,18 @@ class RequestQueue : public holoscan::Resource { public: HOLOSCAN_RESOURCE_FORWARD_ARGS_SUPER(RequestQueue, Resource) - RequestQueue() { queue_ = new queue>(); } + explicit RequestQueue(shared_ptr request_available_condition) + : request_available_condition_(request_available_condition) { + queue_ = new queue>(); + } ~RequestQueue() { delete queue_; } void push(const string request_id, const EntityRequest* request) { queue_->push(std::make_tuple(request_id, request)); - lock_guard lock(request_available_mutex_); - request_available_condition_.notify_all(); - } - - void block_until_data_available() { - unique_lock lock(request_available_mutex_); - request_available_condition_.wait(lock, [this] { return this->queue_->size() > 0; }); + if (request_available_condition_->event_state() == AsynchronousEventState::EVENT_WAITING) { + request_available_condition_->event_state(AsynchronousEventState::EVENT_DONE); + } } tuple pop() { @@ -58,9 +57,8 @@ class RequestQueue : public holoscan::Resource { } private: + shared_ptr request_available_condition_; queue>* queue_; - condition_variable request_available_condition_; - mutex request_available_mutex_; }; class ProcessingQueue : public holoscan::Resource { diff --git a/operators/grpc_operators/tensor_proto.cpp b/operators/grpc_operators/tensor_proto.cpp index 854fbe8af..31949c4d1 100644 --- a/operators/grpc_operators/tensor_proto.cpp +++ b/operators/grpc_operators/tensor_proto.cpp @@ -23,52 +23,30 @@ namespace holoscan::ops { -EntityRequest TensorProto::tensor_to_entity_request(const nvidia::gxf::Entity& gxf_entity) { - EntityRequest request; - auto timestamp = gxf_entity.get(); - if (timestamp) { - request.mutable_timestamp()->set_acqtime((*timestamp)->acqtime); - request.mutable_timestamp()->set_pubtime((*timestamp)->pubtime); - } - - auto tensors = gxf_entity.findAll(); - if (!tensors) { throw std::runtime_error("Tensor not found"); } - for (auto tensor : tensors.value()) { - holoscan::entity::Tensor& tensor_proto = (*request.mutable_tensors())[tensor->name()]; - for (uint32_t i = 0; i < (*tensor)->shape().rank(); i++) { - tensor_proto.add_dimensions((*tensor)->shape().dimension(i)); - } - switch ((*tensor)->element_type()) { - case nvidia::gxf::PrimitiveType::kUnsigned8: - tensor_proto.set_primitive_type(holoscan::entity::Tensor::kUnsigned8); - break; - case nvidia::gxf::PrimitiveType::kUnsigned16: - tensor_proto.set_primitive_type(holoscan::entity::Tensor::kUnsigned16); - break; - case nvidia::gxf::PrimitiveType::kFloat32: - tensor_proto.set_primitive_type(holoscan::entity::Tensor::kFloat32); - break; - default: - throw std::runtime_error(fmt::format("Unsupported primitive type: {}", - static_cast((*tensor)->element_type()))); - } - tensor_proto.set_data((*tensor)->pointer(), (*tensor)->size()); +void TensorProto::gxf_time_to_proto(const nvidia::gxf::Entity& gxf_entity, + ::holoscan::entity::Timestamp* timestamp) { + auto gxf_timestamp = gxf_entity.get(); + if (gxf_timestamp) { + timestamp->set_acqtime((*gxf_timestamp)->acqtime); + timestamp->set_pubtime((*gxf_timestamp)->pubtime); } - return request; } -void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entity, - EntityResponse* response) { - auto timestamp = gxf_entity.get(); - if (timestamp) { - response->mutable_timestamp()->set_acqtime((*timestamp)->acqtime); - response->mutable_timestamp()->set_pubtime((*timestamp)->pubtime); - } +void TensorProto::proto_to_gxf_time(nvidia::gxf::Entity& gxf_entity, + const ::holoscan::entity::Timestamp& timestamp) { + auto gxf_timestamp = gxf_entity.add("timestamp"); + (*gxf_timestamp)->acqtime = timestamp.acqtime(); + (*gxf_timestamp)->pubtime = timestamp.pubtime(); +} +void TensorProto::gxf_tensor_to_proto( + const nvidia::gxf::Entity& gxf_entity, + google::protobuf::Map* tensor_map) { auto tensors = gxf_entity.findAll(); if (!tensors) { throw std::runtime_error("Tensor not found"); } + for (auto tensor : tensors.value()) { - holoscan::entity::Tensor& tensor_proto = (*response->mutable_tensors())[tensor->name()]; + holoscan::entity::Tensor& tensor_proto = (*tensor_map)[tensor->name()]; for (uint32_t i = 0; i < (*tensor)->shape().rank(); i++) { tensor_proto.add_dimensions((*tensor)->shape().dimension(i)); } @@ -89,31 +67,26 @@ void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entit tensor_proto.set_data((*tensor)->pointer(), (*tensor)->size()); } } -nvidia::gxf::Entity TensorProto::entity_request_to_tensor(const EntityRequest* entity_request, - ExecutionContext& context, - std::shared_ptr allocator) { - auto gxf_allocator = - nvidia::gxf::Handle::Create(context.context(), allocator->gxf_cid()); - auto entity = nvidia::gxf::Entity::New(context.context()); - if (!entity) { throw std::runtime_error("Error creating new entity"); } - for (auto tensor_entry : entity_request->tensors()) { +void TensorProto::proto_to_gxf_tensor( + nvidia::gxf::Entity& gxf_entity, + const google::protobuf::Map& tensor_map, + nvidia::gxf::Handle& allocator) { + for (auto tensor_entry : tensor_map) { const holoscan::entity::Tensor& tensor_proto = tensor_entry.second; - auto tensor = entity.value().add(tensor_entry.first.c_str()); + auto tensor = gxf_entity.add(tensor_entry.first.c_str()); if (!tensor) { throw std::runtime_error("Failed to create tensor"); } + nvidia::gxf::Shape shape({tensor_proto.dimensions().begin(), tensor_proto.dimensions().end()}); switch (tensor_proto.primitive_type()) { case holoscan::entity::Tensor::kUnsigned8: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); + tensor.value()->reshape(shape, nvidia::gxf::MemoryStorageType::kHost, allocator); break; case holoscan::entity::Tensor::kUnsigned16: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); + tensor.value()->reshape(shape, nvidia::gxf::MemoryStorageType::kHost, allocator); break; case holoscan::entity::Tensor::kFloat32: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); + tensor.value()->reshape(shape, nvidia::gxf::MemoryStorageType::kHost, allocator); break; default: throw std::runtime_error( @@ -121,42 +94,48 @@ nvidia::gxf::Entity TensorProto::entity_request_to_tensor(const EntityRequest* e } std::copy(tensor_proto.data().begin(), tensor_proto.data().end(), tensor.value()->pointer()); } +} + +EntityRequest TensorProto::tensor_to_entity_request(const nvidia::gxf::Entity& gxf_entity) { + EntityRequest request; + TensorProto::gxf_time_to_proto(gxf_entity, request.mutable_timestamp()); + TensorProto::gxf_tensor_to_proto(gxf_entity, request.mutable_tensors()); + + return request; +} + +void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entity, + EntityResponse* response) { + TensorProto::gxf_time_to_proto(gxf_entity, response->mutable_timestamp()); + TensorProto::gxf_tensor_to_proto(gxf_entity, response->mutable_tensors()); +} + +nvidia::gxf::Entity TensorProto::entity_request_to_tensor(const EntityRequest* entity_request, + ExecutionContext& context, + std::shared_ptr allocator) { + auto gxf_allocator = + nvidia::gxf::Handle::Create(context.context(), allocator->gxf_cid()); + auto entity = nvidia::gxf::Entity::New(context.context()); + if (!entity) { throw std::runtime_error("Error creating new entity"); } + + TensorProto::proto_to_gxf_time(entity.value(), entity_request->timestamp()); + TensorProto::proto_to_gxf_tensor( + entity.value(), entity_request->tensors(), gxf_allocator.value()); return entity.value(); } nvidia::gxf::Entity TensorProto::entity_response_to_tensor(const EntityResponse& entity_response, - ExecutionContext& context, - std::shared_ptr allocator) { + ExecutionContext& context, + std::shared_ptr allocator) { auto gxf_allocator = nvidia::gxf::Handle::Create(context.context(), allocator->gxf_cid()); auto entity = nvidia::gxf::Entity::New(context.context()); if (!entity) { throw std::runtime_error("Error creating new entity"); } - for (auto tensor_entry : entity_response.tensors()) { - const holoscan::entity::Tensor& tensor_proto = tensor_entry.second; - auto tensor = entity.value().add(tensor_entry.first.c_str()); - if (!tensor) { throw std::runtime_error("Failed to create tensor"); } - nvidia::gxf::Shape shape({tensor_proto.dimensions().begin(), tensor_proto.dimensions().end()}); - switch (tensor_proto.primitive_type()) { - case holoscan::entity::Tensor::kUnsigned8: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); - break; - case holoscan::entity::Tensor::kUnsigned16: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); - break; - case holoscan::entity::Tensor::kFloat32: - tensor.value()->reshape( - shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value()); - break; - default: - throw std::runtime_error( - fmt::format("Unsupported primitive type: {}", tensor_proto.primitive_type())); - } - std::copy(tensor_proto.data().begin(), tensor_proto.data().end(), tensor.value()->pointer()); - } + TensorProto::proto_to_gxf_time(entity.value(), entity_response.timestamp()); + TensorProto::proto_to_gxf_tensor( + entity.value(), entity_response.tensors(), gxf_allocator.value()); return entity.value(); } diff --git a/operators/grpc_operators/tensor_proto.hpp b/operators/grpc_operators/tensor_proto.hpp index 23a67d164..4028be18e 100644 --- a/operators/grpc_operators/tensor_proto.hpp +++ b/operators/grpc_operators/tensor_proto.hpp @@ -25,6 +25,19 @@ class TensorProto { static nvidia::gxf::Entity entity_response_to_tensor(const EntityResponse& entity_request, ExecutionContext& context, std::shared_ptr allocator); + + private: + static void gxf_time_to_proto(const nvidia::gxf::Entity& gxf_entity, + ::holoscan::entity::Timestamp* timestamp); + static void gxf_tensor_to_proto( + const nvidia::gxf::Entity& gxf_entity, + google::protobuf::Map* tensor_map); + static void proto_to_gxf_time(nvidia::gxf::Entity& gxf_entity, + const ::holoscan::entity::Timestamp& timestamp); + static void proto_to_gxf_tensor( + nvidia::gxf::Entity& gxf_entity, + const google::protobuf::Map& tensor_map, + nvidia::gxf::Handle& allocator); }; } // namespace holoscan::ops