Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp committed Oct 15, 2024
1 parent e8a1f58 commit 8cd1cd2
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@
},
{
"name": "HOLOSCAN_LOG_LEVEL",
"value": "DEBUG"
"value": "INFO"
},
{
"name": "GRPC_TRACE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ class AppCloud : public AppBase {
Arg("device_allocator") = make_resource<UnboundedAllocator>("device_allocator"),
Arg("host_allocator") = make_resource<UnboundedAllocator>("host_allocator"));

auto grpc_response = make_operator<GrpcServerResponseOp>(
"grpc_response", Arg("response_queue") = response_queue_);

auto grpc_response =
make_operator<GrpcServerResponseOp>("grpc_response",
from_config("grpc_server_response"),
Arg("response_queue") = response_queue_);

add_flow(grpc_request_op, video_decoder_request, {{"output", "input_frame"}});

Expand All @@ -113,7 +114,7 @@ class AppCloud : public AppBase {
add_flow(lstm_inferer, tool_tracking_postprocessor, {{"tensor", "in"}});
add_flow(tool_tracking_postprocessor,
grpc_response,
{{"out_coords", "input"}});
{{"out_coords", "system"}, {"out_mask", "device"}});
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,20 @@ lstm_inference:

tool_tracking_postprocessor:

grpc_server_response:
device_to_system_tensors:
- mask

holoviz:
tensors:
- name: ""
type: color
opacity: 1.0
priority: 0
# - name: mask
# type: color
# opacity: 1.0
# priority: 1
- name: mask
type: color
opacity: 1.0
priority: 1
- name: scaled_coords
type: crosses
opacity: 1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class EntityClient {
~EntityStreamInternal() { writer_thread_.join(); }

void OnWriteDone(bool ok) override {
if (!ok) { HOLOSCAN_LOG_WARN("grpc: write failed"); }
if (!ok) { HOLOSCAN_LOG_WARN("grpc: write failed, error trasnmitting request"); }
write_mutext_.unlock();
}

Expand All @@ -85,7 +85,7 @@ class EntityClient {
auto entity = response_cb_(response_);

client_->response_queue_->push(entity);
HOLOSCAN_LOG_INFO("grpc: Response received and queued");
HOLOSCAN_LOG_INFO("grpc: Response received and queued for display");
Read();
}
}
Expand All @@ -112,7 +112,7 @@ class EntityClient {
request = client_->request_queue_->pop();
request->set_service("endoscopy_tool_tracking");
StartWrite(&*request);
HOLOSCAN_LOG_INFO("grpc: Sending request");
HOLOSCAN_LOG_INFO("grpc: Sending request to server");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ class HoloscanEntityServiceImpl final : public Entity::CallbackService {
}

void OnWriteDone(bool ok) override {
if (!ok) { HOLOSCAN_LOG_WARN("grpc: write failed"); }
if (!ok) { HOLOSCAN_LOG_WARN("grpc: write failed, error writing response"); }
write_mutext_.unlock();
}

void OnReadDone(bool ok) override {
if (ok) {
auto entity = server_->request_cb_(request_);
server_->request_queue_->push(entity);
HOLOSCAN_LOG_INFO("grpc: Request received and queued");
HOLOSCAN_LOG_INFO("grpc: Request received and queued for processing");
Read();
}
}
Expand All @@ -97,7 +97,7 @@ class HoloscanEntityServiceImpl final : public Entity::CallbackService {
std::shared_ptr<EntityResponse> response;
response = server_->response_queue_->pop();
StartWrite(&*response);
HOLOSCAN_LOG_INFO("grpc: Sending response");
HOLOSCAN_LOG_INFO("grpc: Sending response to client");
}
}
void ProcessOutgoingQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class GrpcClientRequestOp : public holoscan::Operator {
auto request = std::make_shared<EntityRequest>();
holoscan::ops::TensorProto::tensor_to_entity_request(maybe_input_message.value(), request);
request_queue_->push(request);
HOLOSCAN_LOG_INFO("grpc: request converted and queued");
HOLOSCAN_LOG_INFO("grpc: request converted and queued for transmission");
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,40 @@ class GrpcServerResponseOp : public holoscan::Operator {
GrpcServerResponseOp() = default;

void setup(OperatorSpec& spec) override {
spec.input<nvidia::gxf::Entity>("input");
spec.input<nvidia::gxf::Entity>("system");
spec.input<nvidia::gxf::Entity>("device");

spec.param(device_to_system_tensors_,
"device_to_system_tensors",
"Device Memory to System Memory",
"Copies tensors from device memory to sytem memory.");
spec.param(response_queue_, "response_queue", "Response Queue", "Outgoing gRPC results.");
}

void compute(InputContext& op_input, OutputContext& op_output,
ExecutionContext& context) override {
auto maybe_input_message = op_input.receive<holoscan::gxf::Entity>("input");
if (!maybe_input_message) {
HOLOSCAN_LOG_ERROR("grpc: Failed to receive input message");
return;
}
auto tensors = 0;
auto response = std::make_shared<EntityResponse>();
holoscan::ops::TensorProto::tensor_to_entity_response(maybe_input_message.value(), response);
response_queue_->push(response);

auto maybe_system_message = op_input.receive<holoscan::gxf::Entity>("system");
if (maybe_system_message) {
holoscan::ops::TensorProto::tensor_to_entity_response(maybe_system_message.value(), response);
tensors++;
}

auto maybe_device_message = op_input.receive<holoscan::gxf::Entity>("device");
if (maybe_device_message) {
holoscan::ops::TensorProto::tensor_to_entity_response(maybe_device_message.value(), response);
tensors++;
}

if (tensors > 0) { response_queue_->push(response); }
}

private:
Parameter<std::vector<std::string>> device_to_system_tensors_;
Parameter<std::shared_ptr<ConditionVariableQueue<std::shared_ptr<EntityResponse>>>>
response_queue_;
;
};

class GrpcServerRequestOp : public holoscan::Operator {
Expand All @@ -68,9 +81,7 @@ class GrpcServerRequestOp : public holoscan::Operator {
if (server_thread_.joinable()) { server_thread_.join(); }
}

void initialize() override {
Operator::initialize();
}
void initialize() override { Operator::initialize(); }

void setup(OperatorSpec& spec) override {
spec.param(server_address_, "server_address", "Server Address", "gRPC Server Address.");
Expand Down
Loading

0 comments on commit 8cd1cd2

Please sign in to comment.