Skip to content

Commit

Permalink
async condition
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp committed Sep 11, 2024
1 parent 81b68bf commit 88f691c
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ class AppCloud : public AppBase {
void compose() override {
using namespace holoscan;

request_queue_ = make_resource<RequestQueue>("request_queue");
auto request_available_condition = make_condition<AsynchronousCondition>("request_available_condition");
request_queue_ = make_resource<RequestQueue>("request_queue", request_available_condition);
processing_queue_ = make_resource<ProcessingQueue>("processing_queue");
response_queue_ = make_resource<ResponseQueue>("response_queue");

auto grpc_request_op =
make_operator<GrpcRequestOp>("grpc_request_op",
Arg("request_queue") = request_queue_,
Arg("processing_queue") = processing_queue_,
Arg("condition") = request_available_condition,
Arg("allocator") = make_resource<UnboundedAllocator>("pool"));

auto response_condition = make_condition<AsynchronousCondition>("response_condition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ class HoloscanEntityServiceImpl final : public Entity::Service {
auto service = request->service();

if (service == "endoscopy_tool_tracking") {
;
std::string request_id = generate_request_id();
request_queue_->push(request_id, request);
response_queue_->block_until_data_available();
auto response = response_queue_->pop();
holoscan::ops::TensorProto::tensor_to_entity_response(response, reply);
// response_queue_->block_until_data_available();
// auto response = response_queue_->pop();
// holoscan::ops::TensorProto::tensor_to_entity_response(response, reply);
return Status::OK;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,30 @@ class GrpcRequestOp : public holoscan::Operator {

GrpcRequestOp() = default;

void start() override { condition_->event_state(AsynchronousEventState::EVENT_WAITING); }
void stop() override { condition_->event_state(AsynchronousEventState::EVENT_NEVER); }

void setup(OperatorSpec& spec) override {
spec.param(request_queue_, "request_queue", "Request Queue", "Incoming gRPC requests.");
spec.param(processing_queue_, "processing_queue", "Processing Queue", "In processing queue.");
spec.param(allocator_, "allocator", "Allocator", "Output Allocator");
spec.param(condition_, "condition", "Asynchronous Condition", "Asynchronous Condition");

spec.output<std::string>("out");
}

void compute(InputContext& op_input, OutputContext& op_output,
ExecutionContext& context) override {
request_queue_->block_until_data_available();
auto request = request_queue_->pop();
processing_queue_->push(std::get<0>(request));
auto tensor = holoscan::ops::TensorProto::entity_request_to_tensor(
std::get<1>(request), context, allocator_.get());
op_output.emit(tensor, "out");
condition_->event_state(AsynchronousEventState::EVENT_WAITING);
}

private:
Parameter<std::shared_ptr<AsynchronousCondition>> condition_;
Parameter<std::shared_ptr<RequestQueue>> request_queue_;
Parameter<std::shared_ptr<ProcessingQueue>> processing_queue_;
Parameter<std::shared_ptr<Allocator>> allocator_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ int main(int argc, char** argv) {
HOLOSCAN_LOG_INFO("Using input data from {}", data_directory);
app->set_datapath(data_directory);

auto scheduler = app->make_scheduler<holoscan::MultiThreadScheduler>(
"event-scheduler",
Arg("worker_thread_number", 3L),
Arg("stop_on_deadlock", false)
);
app->scheduler(scheduler);
app->run();

return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,18 @@ class RequestQueue : public holoscan::Resource {
public:
HOLOSCAN_RESOURCE_FORWARD_ARGS_SUPER(RequestQueue, Resource)

RequestQueue() { queue_ = new queue<tuple<const string, const EntityRequest*>>(); }
explicit RequestQueue(shared_ptr<AsynchronousCondition> request_available_condition)
: request_available_condition_(request_available_condition) {
queue_ = new queue<tuple<const string, const EntityRequest*>>();
}

~RequestQueue() { delete queue_; }

void push(const string request_id, const EntityRequest* request) {
queue_->push(std::make_tuple(request_id, request));
lock_guard<mutex> lock(request_available_mutex_);
request_available_condition_.notify_all();
}

void block_until_data_available() {
unique_lock<mutex> lock(request_available_mutex_);
request_available_condition_.wait(lock, [this] { return this->queue_->size() > 0; });
if (request_available_condition_->event_state() == AsynchronousEventState::EVENT_WAITING) {
request_available_condition_->event_state(AsynchronousEventState::EVENT_DONE);
}
}

tuple<const string, const EntityRequest*> pop() {
Expand All @@ -58,9 +57,8 @@ class RequestQueue : public holoscan::Resource {
}

private:
shared_ptr<AsynchronousCondition> request_available_condition_;
queue<tuple<const string, const EntityRequest*>>* queue_;
condition_variable request_available_condition_;
mutex request_available_mutex_;
};

class ProcessingQueue : public holoscan::Resource {
Expand Down
139 changes: 59 additions & 80 deletions operators/grpc_operators/tensor_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,52 +23,30 @@

namespace holoscan::ops {

EntityRequest TensorProto::tensor_to_entity_request(const nvidia::gxf::Entity& gxf_entity) {
EntityRequest request;
auto timestamp = gxf_entity.get<nvidia::gxf::Timestamp>();
if (timestamp) {
request.mutable_timestamp()->set_acqtime((*timestamp)->acqtime);
request.mutable_timestamp()->set_pubtime((*timestamp)->pubtime);
}

auto tensors = gxf_entity.findAll<nvidia::gxf::Tensor, 4>();
if (!tensors) { throw std::runtime_error("Tensor not found"); }
for (auto tensor : tensors.value()) {
holoscan::entity::Tensor& tensor_proto = (*request.mutable_tensors())[tensor->name()];
for (uint32_t i = 0; i < (*tensor)->shape().rank(); i++) {
tensor_proto.add_dimensions((*tensor)->shape().dimension(i));
}
switch ((*tensor)->element_type()) {
case nvidia::gxf::PrimitiveType::kUnsigned8:
tensor_proto.set_primitive_type(holoscan::entity::Tensor::kUnsigned8);
break;
case nvidia::gxf::PrimitiveType::kUnsigned16:
tensor_proto.set_primitive_type(holoscan::entity::Tensor::kUnsigned16);
break;
case nvidia::gxf::PrimitiveType::kFloat32:
tensor_proto.set_primitive_type(holoscan::entity::Tensor::kFloat32);
break;
default:
throw std::runtime_error(fmt::format("Unsupported primitive type: {}",
static_cast<int>((*tensor)->element_type())));
}
tensor_proto.set_data((*tensor)->pointer(), (*tensor)->size());
void TensorProto::gxf_time_to_proto(const nvidia::gxf::Entity& gxf_entity,
::holoscan::entity::Timestamp* timestamp) {
auto gxf_timestamp = gxf_entity.get<nvidia::gxf::Timestamp>();
if (gxf_timestamp) {
timestamp->set_acqtime((*gxf_timestamp)->acqtime);
timestamp->set_pubtime((*gxf_timestamp)->pubtime);
}
return request;
}

void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entity,
EntityResponse* response) {
auto timestamp = gxf_entity.get<nvidia::gxf::Timestamp>();
if (timestamp) {
response->mutable_timestamp()->set_acqtime((*timestamp)->acqtime);
response->mutable_timestamp()->set_pubtime((*timestamp)->pubtime);
}
void TensorProto::proto_to_gxf_time(nvidia::gxf::Entity& gxf_entity,
const ::holoscan::entity::Timestamp& timestamp) {
auto gxf_timestamp = gxf_entity.add<nvidia::gxf::Timestamp>("timestamp");
(*gxf_timestamp)->acqtime = timestamp.acqtime();
(*gxf_timestamp)->pubtime = timestamp.pubtime();
}

void TensorProto::gxf_tensor_to_proto(
const nvidia::gxf::Entity& gxf_entity,
google::protobuf::Map<std::string, ::holoscan::entity::Tensor>* tensor_map) {
auto tensors = gxf_entity.findAll<nvidia::gxf::Tensor, 4>();
if (!tensors) { throw std::runtime_error("Tensor not found"); }

for (auto tensor : tensors.value()) {
holoscan::entity::Tensor& tensor_proto = (*response->mutable_tensors())[tensor->name()];
holoscan::entity::Tensor& tensor_proto = (*tensor_map)[tensor->name()];
for (uint32_t i = 0; i < (*tensor)->shape().rank(); i++) {
tensor_proto.add_dimensions((*tensor)->shape().dimension(i));
}
Expand All @@ -89,74 +67,75 @@ void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entit
tensor_proto.set_data((*tensor)->pointer(), (*tensor)->size());
}
}
nvidia::gxf::Entity TensorProto::entity_request_to_tensor(const EntityRequest* entity_request,
ExecutionContext& context,
std::shared_ptr<Allocator> allocator) {
auto gxf_allocator =
nvidia::gxf::Handle<nvidia::gxf::Allocator>::Create(context.context(), allocator->gxf_cid());
auto entity = nvidia::gxf::Entity::New(context.context());
if (!entity) { throw std::runtime_error("Error creating new entity"); }

for (auto tensor_entry : entity_request->tensors()) {
void TensorProto::proto_to_gxf_tensor(
nvidia::gxf::Entity& gxf_entity,
const google::protobuf::Map<std::string, ::holoscan::entity::Tensor>& tensor_map,
nvidia::gxf::Handle<nvidia::gxf::Allocator>& allocator) {
for (auto tensor_entry : tensor_map) {
const holoscan::entity::Tensor& tensor_proto = tensor_entry.second;
auto tensor = entity.value().add<nvidia::gxf::Tensor>(tensor_entry.first.c_str());
auto tensor = gxf_entity.add<nvidia::gxf::Tensor>(tensor_entry.first.c_str());
if (!tensor) { throw std::runtime_error("Failed to create tensor"); }

nvidia::gxf::Shape shape({tensor_proto.dimensions().begin(), tensor_proto.dimensions().end()});
switch (tensor_proto.primitive_type()) {
case holoscan::entity::Tensor::kUnsigned8:
tensor.value()->reshape<uint8_t>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
tensor.value()->reshape<uint8_t>(shape, nvidia::gxf::MemoryStorageType::kHost, allocator);
break;
case holoscan::entity::Tensor::kUnsigned16:
tensor.value()->reshape<uint16_t>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
tensor.value()->reshape<uint16_t>(shape, nvidia::gxf::MemoryStorageType::kHost, allocator);
break;
case holoscan::entity::Tensor::kFloat32:
tensor.value()->reshape<float>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
tensor.value()->reshape<float>(shape, nvidia::gxf::MemoryStorageType::kHost, allocator);
break;
default:
throw std::runtime_error(
fmt::format("Unsupported primitive type: {}", tensor_proto.primitive_type()));
}
std::copy(tensor_proto.data().begin(), tensor_proto.data().end(), tensor.value()->pointer());
}
}

EntityRequest TensorProto::tensor_to_entity_request(const nvidia::gxf::Entity& gxf_entity) {
EntityRequest request;
TensorProto::gxf_time_to_proto(gxf_entity, request.mutable_timestamp());
TensorProto::gxf_tensor_to_proto(gxf_entity, request.mutable_tensors());

return request;
}

void TensorProto::tensor_to_entity_response(const nvidia::gxf::Entity& gxf_entity,
EntityResponse* response) {
TensorProto::gxf_time_to_proto(gxf_entity, response->mutable_timestamp());
TensorProto::gxf_tensor_to_proto(gxf_entity, response->mutable_tensors());
}

nvidia::gxf::Entity TensorProto::entity_request_to_tensor(const EntityRequest* entity_request,
ExecutionContext& context,
std::shared_ptr<Allocator> allocator) {
auto gxf_allocator =
nvidia::gxf::Handle<nvidia::gxf::Allocator>::Create(context.context(), allocator->gxf_cid());
auto entity = nvidia::gxf::Entity::New(context.context());
if (!entity) { throw std::runtime_error("Error creating new entity"); }

TensorProto::proto_to_gxf_time(entity.value(), entity_request->timestamp());
TensorProto::proto_to_gxf_tensor(
entity.value(), entity_request->tensors(), gxf_allocator.value());

return entity.value();
}

nvidia::gxf::Entity TensorProto::entity_response_to_tensor(const EntityResponse& entity_response,
ExecutionContext& context,
std::shared_ptr<Allocator> allocator) {
ExecutionContext& context,
std::shared_ptr<Allocator> allocator) {
auto gxf_allocator =
nvidia::gxf::Handle<nvidia::gxf::Allocator>::Create(context.context(), allocator->gxf_cid());
auto entity = nvidia::gxf::Entity::New(context.context());
if (!entity) { throw std::runtime_error("Error creating new entity"); }

for (auto tensor_entry : entity_response.tensors()) {
const holoscan::entity::Tensor& tensor_proto = tensor_entry.second;
auto tensor = entity.value().add<nvidia::gxf::Tensor>(tensor_entry.first.c_str());
if (!tensor) { throw std::runtime_error("Failed to create tensor"); }
nvidia::gxf::Shape shape({tensor_proto.dimensions().begin(), tensor_proto.dimensions().end()});
switch (tensor_proto.primitive_type()) {
case holoscan::entity::Tensor::kUnsigned8:
tensor.value()->reshape<uint8_t>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
break;
case holoscan::entity::Tensor::kUnsigned16:
tensor.value()->reshape<uint16_t>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
break;
case holoscan::entity::Tensor::kFloat32:
tensor.value()->reshape<float>(
shape, nvidia::gxf::MemoryStorageType::kHost, gxf_allocator.value());
break;
default:
throw std::runtime_error(
fmt::format("Unsupported primitive type: {}", tensor_proto.primitive_type()));
}
std::copy(tensor_proto.data().begin(), tensor_proto.data().end(), tensor.value()->pointer());
}
TensorProto::proto_to_gxf_time(entity.value(), entity_response.timestamp());
TensorProto::proto_to_gxf_tensor(
entity.value(), entity_response.tensors(), gxf_allocator.value());

return entity.value();
}
Expand Down
13 changes: 13 additions & 0 deletions operators/grpc_operators/tensor_proto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ class TensorProto {
static nvidia::gxf::Entity entity_response_to_tensor(const EntityResponse& entity_request,
ExecutionContext& context,
std::shared_ptr<Allocator> allocator);

private:
static void gxf_time_to_proto(const nvidia::gxf::Entity& gxf_entity,
::holoscan::entity::Timestamp* timestamp);
static void gxf_tensor_to_proto(
const nvidia::gxf::Entity& gxf_entity,
google::protobuf::Map<std::string, ::holoscan::entity::Tensor>* tensor_map);
static void proto_to_gxf_time(nvidia::gxf::Entity& gxf_entity,
const ::holoscan::entity::Timestamp& timestamp);
static void proto_to_gxf_tensor(
nvidia::gxf::Entity& gxf_entity,
const google::protobuf::Map<std::string, ::holoscan::entity::Tensor>& tensor_map,
nvidia::gxf::Handle<nvidia::gxf::Allocator>& allocator);
};

} // namespace holoscan::ops
Expand Down

0 comments on commit 88f691c

Please sign in to comment.