Skip to content

Commit

Permalink
Polarfire ethernet sensor bridge peoplenet and bodypose application s…
Browse files Browse the repository at this point in the history
…upport

Signed-off-by: Sunny B <[email protected]>
  • Loading branch information
sunny9661 committed Nov 1, 2024
1 parent fe04c1e commit d8ce78c
Show file tree
Hide file tree
Showing 2 changed files with 577 additions and 0 deletions.
287 changes: 287 additions & 0 deletions examples/linux_body_pose_estimation_imx477.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# See README.md for detailed information.

import argparse
import ctypes
import logging
import os

import holoscan
from body_pose_estimation import FormatInferenceInputOp, PostprocessorOp
from cuda import cuda

import hololink as hololink_module


class HoloscanApplication(holoscan.core.Application):
def __init__(
self,
headless,
fullscreen,
cuda_context,
cuda_device_ordinal,
hololink_channel,
camera,
frame_limit,
engine,
):
logging.info("__init__")
super().__init__()
self._headless = headless
self._fullscreen = fullscreen
self._cuda_context = cuda_context
self._cuda_device_ordinal = cuda_device_ordinal
self._hololink_channel = hololink_channel
self._camera = camera
self._frame_limit = frame_limit
self._engine = engine

def compose(self):
logging.info("compose")
if self._frame_limit:
self._count = holoscan.conditions.CountCondition(
self,
name="count",
count=self._frame_limit,
)
condition = self._count
else:
self._ok = holoscan.conditions.BooleanCondition(
self, name="ok", enable_tick=True
)
condition = self._ok

csi_to_bayer_pool = holoscan.resources.BlockMemoryPool(
self,
name="pool",
# storage_type of 1 is device memory
storage_type=1,
block_size=self._camera._width
* ctypes.sizeof(ctypes.c_uint16)
* self._camera._height,
num_blocks=2,
)
csi_to_bayer_operator = hololink_module.operators.CsiToBayerOp(
self,
name="csi_to_bayer",
allocator=csi_to_bayer_pool,
cuda_device_ordinal=self._cuda_device_ordinal,
)
self._camera.configure_converter(csi_to_bayer_operator)

frame_size = csi_to_bayer_operator.get_csi_length()
frame_context = self._cuda_context
receiver_operator = hololink_module.operators.LinuxReceiverOperator(
self,
condition,
name="receiver",
frame_size=frame_size,
frame_context=frame_context,
hololink_channel=self._hololink_channel,
device=self._camera,
)

bayer_format = self._camera.bayer_format()
pixel_format = self._camera.pixel_format()
image_processor_operator = hololink_module.operators.ImageProcessorOp(
self,
name="image_processor",
# Optical black value for imx274 is 50
optical_black=50,
bayer_format=bayer_format.value,
pixel_format=pixel_format.value,
)

rgb_components_per_pixel = 3
bayer_pool = holoscan.resources.BlockMemoryPool(
self,
name="pool",
# storage_type of 1 is device memory
storage_type=1,
block_size=self._camera._width
* rgb_components_per_pixel
* ctypes.sizeof(ctypes.c_uint16)
* self._camera._height,
num_blocks=2,
)
demosaic = holoscan.operators.BayerDemosaicOp(
self,
name="demosaic",
pool=bayer_pool,
generate_alpha=False,
bayer_grid_pos=bayer_format.value,
interpolation_mode=0,
)

gamma_correction = hololink_module.operators.GammaCorrectionOp(
self,
name="gamma_correction",
cuda_device_ordinal=self._cuda_device_ordinal,
)

image_shift = hololink_module.operators.ImageShiftToUint8Operator(
self, name="image_shift", shift=8
)

visualizer = holoscan.operators.HolovizOp(
self,
name="holoviz",
fullscreen=self._fullscreen,
headless=self._headless,
**self.kwargs("holoviz"),
)

#
pool = holoscan.resources.UnboundedAllocator(self)
preprocessor_args = self.kwargs("preprocessor")
preprocessor = holoscan.operators.FormatConverterOp(
self,
name="preprocessor",
pool=pool,
**preprocessor_args,
)
format_input = FormatInferenceInputOp(
self,
name="transpose",
pool=pool,
)
inference = holoscan.operators.InferenceOp(
self,
name="inference",
allocator=pool,
model_path_map={
"yolo_pose": self._engine,
},
**self.kwargs("inference"),
)
postprocessor_args = self.kwargs("postprocessor")
postprocessor_args["image_width"] = preprocessor_args["resize_width"]
postprocessor_args["image_height"] = preprocessor_args["resize_height"]
postprocessor = PostprocessorOp(
self,
name="postprocessor",
allocator=pool,
**postprocessor_args,
)

#
self.add_flow(receiver_operator, csi_to_bayer_operator, {("output", "input")})
self.add_flow(
csi_to_bayer_operator, image_processor_operator, {("output", "input")}
)
self.add_flow(image_processor_operator, demosaic, {("output", "receiver")})
self.add_flow(demosaic, gamma_correction, {("transmitter", "input")})
self.add_flow(gamma_correction, image_shift)
self.add_flow(image_shift, visualizer, {("output", "receivers")})
self.add_flow(image_shift, preprocessor, {("output", "")})
self.add_flow(preprocessor, format_input)
self.add_flow(format_input, inference, {("", "receivers")})
self.add_flow(inference, postprocessor, {("transmitter", "in")})
self.add_flow(postprocessor, visualizer, {("out", "receivers")})


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--headless", action="store_true", help="Run in headless mode")
parser.add_argument(
"--fullscreen", action="store_true", help="Run in fullscreen mode"
)
parser.add_argument(
"--frame-limit",
type=int,
default=None,
help="Exit after receiving this many frames",
)
default_configuration = os.path.join(
os.path.dirname(__file__), "body_pose_estimation.yaml"
)
parser.add_argument(
"--configuration", default=default_configuration, help="Configuration file"
)
default_engine = os.path.join(os.path.dirname(__file__), "yolov8n-pose.onnx")
parser.add_argument(
"--engine",
default=default_engine,
help="TRT engine model",
)
parser.add_argument(
"--log-level",
type=int,
default=20,
help="Logging level to display",
)
parser.add_argument(
"--cam",
type=int,
default=0,
choices=(0, 1),
help="which camera to stream: 0 to stream camera connected to j14 or 1 to stream camera connected to j17 (default is 0)",
)
args = parser.parse_args()
hololink_module.logging_level(args.log_level)
logging.info("Initializing.")
# Get a handle to the GPU
(cu_result,) = cuda.cuInit(0)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_device_ordinal = 0
cu_result, cu_device = cuda.cuDeviceGet(cu_device_ordinal)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_result, cu_context = cuda.cuDevicePrimaryCtxRetain(cu_device)
assert cu_result == cuda.CUresult.CUDA_SUCCESS

# Get a handle to the Hololink device
if args.cam == 0:
channel_metadata = hololink_module.Enumerator.find_channel(
channel_ip="192.168.0.2"
)
elif args.cam == 1:
channel_metadata = hololink_module.Enumerator.find_channel(
channel_ip="192.168.0.3"
)
else:
raise Exception(f"Unexpected camera={args.cam}")

hololink_channel = hololink_module.DataChannel(channel_metadata)
# Get a handle to the camera
camera = hololink_module.sensors.imx477.Imx477(hololink_channel, args.cam)

# Set up the application
application = HoloscanApplication(
args.headless,
args.fullscreen,
cu_context,
cu_device_ordinal,
hololink_channel,
camera,
args.frame_limit,
args.engine,
)
application.config(args.configuration)
# Run it.
hololink = hololink_channel.hololink()
hololink.start()
hololink.reset()
camera.configure()
application.run()
hololink.stop()

(cu_result,) = cuda.cuDevicePrimaryCtxRelease(cu_device)
assert cu_result == cuda.CUresult.CUDA_SUCCESS


if __name__ == "__main__":
main()
Loading

0 comments on commit d8ce78c

Please sign in to comment.