From c6cb9c9fbbe38141c5a8f93a878de2bb78e01b52 Mon Sep 17 00:00:00 2001 From: Minh Nguyen <64875104+MinhxNguyen7@users.noreply.github.com> Date: Thu, 20 Jun 2024 22:58:00 -0700 Subject: [PATCH] Refactor ImagingNode to a Normal Class (#226) --- scripts/imaging_node.py | 390 ---------------------------- uavf_2024/async_utils.py | 7 + uavf_2024/gnc/commander_node.py | 58 +++-- uavf_2024/imaging/__init__.py | 2 + uavf_2024/imaging/camera_control.py | 6 +- uavf_2024/imaging/imaging_types.py | 4 +- uavf_2024/imaging/perception.py | 264 +++++++++++++++++++ uavf_2024/imaging/pose.py | 129 +++++++++ uavf_2024/utilization_logger.py | 85 ++++++ 9 files changed, 524 insertions(+), 421 deletions(-) delete mode 100755 scripts/imaging_node.py create mode 100644 uavf_2024/imaging/perception.py create mode 100644 uavf_2024/imaging/pose.py create mode 100644 uavf_2024/utilization_logger.py diff --git a/scripts/imaging_node.py b/scripts/imaging_node.py deleted file mode 100755 index d12761a4..00000000 --- a/scripts/imaging_node.py +++ /dev/null @@ -1,390 +0,0 @@ -#!/usr/bin/env python3 - -import json -import os -import time -import traceback -from pathlib import Path -from typing import Any, Callable, NamedTuple -from bisect import bisect_left - -import cv2 as cv -import numpy as np -import rclpy -from geometry_msgs.msg import Point, PoseStamped -from rclpy.node import Node -from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy -from scipy.spatial.transform import Rotation, Slerp - -from libuavf_2024.msg import TargetDetection -from libuavf_2024.srv import PointCam, ResetLogDir, TakePicture, ZoomCam -from uavf_2024.async_utils import AsyncBuffer, OnceCallable, RosLoggingProvider, Subscriptions -from uavf_2024.imaging import Camera, ImageProcessor, Localizer - - -def log_exceptions(func): - ''' - Decorator that can be applied to methods on any class that extends - a ros `Node` to make them correctly log exceptions when run through - a roslaunch file - ''' - def wrapped_fn(self,*args, **kwargs): - try: - return func(self, *args, **kwargs) - except Exception: - self.get_logger().error(traceback.format_exc()) - return wrapped_fn - - -class PoseDatum(NamedTuple): - """ - Our representation of the pose data from the Cube. - """ - position: Point - rotation: Rotation - time_seconds: float - - def to_json(self): - return { - "position": [self.position.x, self.position.y, self.position.z], - "rotation": list(self.rotation.as_quat()), - "time_seconds": self.time_seconds - } - - -QOS_PROFILE = QoSProfile( - reliability=ReliabilityPolicy.BEST_EFFORT, - durability=DurabilityPolicy.VOLATILE, - depth = 1 -) - - -class PoseProvider(RosLoggingProvider[PoseStamped, PoseDatum]): - def _subscribe_to_topic(self, action: Callable[[PoseStamped], Any]) -> None: - self.node.create_subscription( - PoseStamped, - '/mavros/local_position/pose', - action, - QOS_PROFILE - ) - - def log_to_file(self, item: PoseDatum): - logs_dir = self.get_logs_dir() - if logs_dir is None: - raise ValueError("Logging directory not given in initialization.") - - path = logs_dir / f"{item.time_seconds:.02f}.json" - with open(path, "w") as f: - json.dump(item.to_json(), f) - - def format_data(self, message: PoseStamped) -> PoseDatum: - quaternion = message.pose.orientation - - return PoseDatum( - position = message.pose.position, - rotation = Rotation.from_quat( - [quaternion.x, quaternion.y, quaternion.z, quaternion.w]), - time_seconds = message.header.stamp.sec + message.header.stamp.nanosec / 1e9 - ) - - def _interpolate_from_buffer(self, time_seconds: float, wait: bool = False) -> PoseDatum | None: - """ - Returns the pose datum interpolated between the two data points before and after the time given. - - If this interpolation is not possible because the pose is too old, returns the oldest pose. - If this interpolation is not possible because the poses are not old enough, wait until enough data is available if wait is enabled. - Otherwise, return the newest pose - If there is no pose available, wait if enabled. Otherwise, return None. - """ - if self._buffer.count == 0: - return None - - data = self._buffer.get_all_reversed() - - closest_idx = bisect_left([d.time_seconds for d in data], time_seconds) - - if closest_idx == 0: - return data[0] - - # Poll every 100ms - while closest_idx == len(data): - if not wait: - return data[closest_idx - 1] - - time.sleep(0.1) - data = self._buffer.get_all_reversed() - closest_idx = bisect_left([d.time_seconds for d in data], time_seconds) - - pt_before = data[closest_idx - 1] - pt_after = data[closest_idx] - - # Interpolate between the two points - proportion = (time_seconds - pt_before.time_seconds) / (pt_after.time_seconds - pt_before.time_seconds) - position = Point( - x = pt_before.position.x + proportion * (pt_after.position.x - pt_before.position.x), - y = pt_before.position.y + proportion * (pt_after.position.y - pt_before.position.y), - z = pt_before.position.z + proportion * (pt_after.position.z - pt_before.position.z) - ) - key_times = [pt_before.time_seconds, pt_after.time_seconds] - rotations = Rotation.concatenate([pt_before.rotation, pt_after.rotation]) - slerp = Slerp(key_times, rotations) - rotation = Rotation.from_quat(slerp([time_seconds]).as_quat()[0]) - # go to and from quaternion to force returned - # object to be single rotation - - return PoseDatum( - position = position, - rotation = rotation, - time_seconds = time_seconds - ) - - def get_interpolated(self, time_seconds: float) -> tuple[PoseDatum, bool]: - ''' - Gets the pose interpolated at `time_seconds`. Blocks if we don't have enough data in the queue - to interpolate yet, for a maximum of 5 seconds before giving up and returning the latest datum - regardless of its timestamp - - ''' - for _ in range(50): - interp_pose = self._interpolate_from_buffer(time_seconds, True) - if interp_pose is not None: - return (interp_pose, True) - else: - time.sleep(0.1) - return (self._buffer.get_fresh(), False) - - -class ImagingNode(Node): - @log_exceptions - def __init__(self) -> None: - # Initialize the node - super().__init__('imaging_node') # type: ignore - self.logs_path = Path(f'/media/forge/SANDISK/logs/{time.strftime("%m-%d %Hh%Mm")}') - - self.camera = Camera(self.logs_path / "camera") - self.zoom_level = 3 - self.camera_state = True # True if camera is pointing down for auto-cam-point. Only for auto-point FSM - self.camera.setAbsoluteZoom(self.zoom_level) - - self.log(f"Logging to {self.logs_path}") - self.image_processor = ImageProcessor(self.logs_path / "image_processor") - - # Set up ROS connections - self.log(f"Setting up imaging node ROS connections") - - # Subscriptions ---- - self.pose_provider = PoseProvider(self, self.logs_path / "poses", 128) - self.pose_provider.subscribe(self.cam_auto_point) - - # Only start the recording once (when the first pose comes in) - start_recording_once = OnceCallable(lambda _: self.camera.start_recording()) - self.pose_provider.subscribe(start_recording_once) - - # Services ---- - # Set up take picture service - self.imaging_service = self.create_service(TakePicture, 'imaging_service', self.get_image_down) - - # Set up recenter camera service - self.recenter_service = self.create_service(PointCam, 'recenter_service', self.request_point_cb) - # Set up zoom camera service - self.zoom_service = self.create_service(ZoomCam, 'zoom_service', self.setAbsoluteZoom_cb) - # Set up reset log directory service - self.reset_log_dir_service = self.create_service(ResetLogDir, 'reset_log_dir', self.reset_log_dir_cb) - - # Cleanup - self.get_logger().info("Finished initializing imaging node") - - @log_exceptions - def log(self, *args, **kwargs): - self.get_logger().info(*args, **kwargs) - - @log_exceptions - def cam_auto_point(self, current_pose: PoseDatum): - current_z = current_pose.position.z - - first_pose = self.pose_provider.get_first_datum() - if first_pose is None: - self.log("First datum not found trying to auto-point camera.") - return - - alt_from_gnd = current_z - first_pose.position.z - - # If pointed down and close to the ground, point forward - if(self.camera_state and alt_from_gnd < 3): #3 meters ~ 30 feet - self.camera.request_center() - self.camera_state = False - self.camera.stop_recording() - self.log(f"Crossing 3m down, pointing forward. Current altitude: {alt_from_gnd}") - # If pointed forward and altitude is higher, point down - elif(not self.camera_state and alt_from_gnd > 3): - self.camera.request_down() - self.camera_state = True - self.camera.start_recording() - self.log(f"Crossing 3m up, pointing down. Current altitude: {alt_from_gnd}") - else: - return - self.camera.request_autofocus() - - - @log_exceptions - def request_point_cb(self, request, response): - self.log(f"Received Point Camera Down Request: {request}") - if request.down: - response.success = self.camera.request_down() - else: - response.success = self.camera.request_center() - self.camera.request_autofocus() - return response - - @log_exceptions - def setAbsoluteZoom_cb(self, request, response): - self.log(f"Received Set Zoom Request: {request}") - response.success = self.camera.setAbsoluteZoom(request.zoom_level) - self.camera.request_autofocus() - self.zoom_level = request.zoom_level - return response - - @log_exceptions - def reset_log_dir_cb(self, request, response): - new_logs_dir = Path('logs/{strftime("%m-%d %H:%M")}') - self.log(f"Starting new log directory at {new_logs_dir}") - os.makedirs(new_logs_dir, exist_ok = True) - self.image_processor.reset_log_directory(new_logs_dir / 'image_processor') - self.camera.set_log_dir(new_logs_dir / 'camera') - response.success = True - return response - - @log_exceptions - def make_localizer(self): - focal_len = self.camera.getFocalLength() - - first_pose = self.pose_provider.get_first_datum() - if first_pose is None: - self.log("First datum does not exist trying to make Localizer.") - return - - localizer = Localizer.from_focal_length( - focal_len, - (1920, 1080), - (np.array([1,0,0]), np.array([0,-1, 0])), - 2, - first_pose.position.z - 0.15 # cube is about 15cm off ground - ) - return localizer - - @log_exceptions - def point_camera_down(self): - self.camera.request_down() - while abs(self.camera.getAttitude()[1] - -90) > 2: - self.log(f"Waiting to point down. Current angle: {self.camera.getAttitude()[1] } . " ) - time.sleep(0.1) - self.log("Camera pointed down") - - @log_exceptions - def get_image_down(self, request, response: list[TargetDetection]) -> list[TargetDetection]: - ''' - autofocus, then wait till cam points down, take pic, - - We want to take photo when the attitude is down only. - ''' - self.log("Received Down Image Request") - self.camera.request_autofocus() - self.pose_provider.wait_for_data() - - if abs(self.camera.getAttitude()[1] - -90) > 5: # Allow 5 degrees of error (Arbitrary) - self.point_camera_down() - - #TODO: Figure out a way to detect when the gimbal is having an aneurism and figure out how to fix it or send msg to groundstation. - - # Take picture and grab relevant data - img = self.camera.get_latest_image() - if img is None: - self.log("Could not get image from Camera.") - return [] - timestamp = time.time() - self.log(f"Got image from Camera at time {timestamp}") - - localizer = self.make_localizer() - if localizer is None: - self.log("Could not get Localizer") - return [] - - detections = self.image_processor.process_image(img) - self.log(f"Finished image processing. Got {len(detections)} detections") - - # Get avg camera pose for the image - angles = self.camera.getAttitudeInterpolated(timestamp) - self.log(f"Got camera attitude: {angles}") - - # Get the pose measured 0.75 seconds before we received the image - # This is to account for the delay in the camera system, and was determined empirically - pose, is_timestamp_interpolated = self.pose_provider.get_interpolated(timestamp - 0.75) - if not is_timestamp_interpolated: - self.log("Couldn't interpolate pose.") - self.log(f"Got pose: {angles}") - - cur_position_np = np.array([pose.position.x, pose.position.y, pose.position.z]) - cur_rot_quat = pose.rotation.as_quat() - - world_orientation = self.camera.orientation_in_world_frame(pose.rotation, angles) - cam_pose = (cur_position_np, world_orientation) - - # Get 3D predictions - preds_3d = [localizer.prediction_to_coords(d, cam_pose) for d in detections] - - # Log data - logs_folder = self.image_processor.get_last_logs_path() - self.log(f"This frame going to {logs_folder}") - self.log(f"Zoom level: {self.zoom_level}") - os.makedirs(logs_folder, exist_ok=True) - cv.imwrite(f"{logs_folder}/image.png", img.get_array()) - log_data = { - 'pose_time': pose.time_seconds, - 'image_time': timestamp, - 'drone_position': cur_position_np.tolist(), - 'drone_q': cur_rot_quat.tolist(), - 'gimbal_yaw': angles[0], - 'gimbal_pitch': angles[1], - 'gimbal_roll': angles[2], - 'zoom level': self.zoom_level, - 'preds_3d': [ - { - 'position': p.position.tolist(), - 'id': p.id, - } for p in preds_3d - ] - } - json.dump(log_data, open(f"{logs_folder}/data.json", 'w+'), indent=4) - - response.detections = [] - for i, p in enumerate(preds_3d): - t = TargetDetection( - timestamp = int(timestamp*1000), - x = p.position[0], - y = p.position[1], - z = p.position[2], - shape_conf = p.descriptor.shape_probs.tolist(), - letter_conf = p.descriptor.letter_probs.tolist(), - shape_color_conf = p.descriptor.shape_col_probs.tolist(), - letter_color_conf = p.descriptor.letter_col_probs.tolist(), - id = p.id - ) - - response.detections.append(t) - - return response - - -def main(args=None) -> None: - print('Starting imaging node...') - rclpy.init(args=args) - node = ImagingNode() - rclpy.spin(node) - - - -if __name__ == '__main__': - try: - main() - except Exception as e: - print(e) diff --git a/uavf_2024/async_utils.py b/uavf_2024/async_utils.py index 1361448f..81feb58b 100644 --- a/uavf_2024/async_utils.py +++ b/uavf_2024/async_utils.py @@ -176,6 +176,13 @@ def __init__( def log(self, message, level = logging.INFO): self.logger.log(level, message) + + def set_log_dir(self, new_dir: str | os.PathLike | Path): + self._logs_dir = Path(new_dir) + if not self._logs_dir.exists(): + self._logs_dir.mkdir(parents=True) + elif not self._logs_dir.is_dir(): + raise FileExistsError(f"{self._logs_dir} exists but is not a directory") @abstractmethod def _subscribe_to_topic(self, action: Callable[[MessageT], Any]) -> None: diff --git a/uavf_2024/gnc/commander_node.py b/uavf_2024/gnc/commander_node.py index e2916b53..ea52e602 100644 --- a/uavf_2024/gnc/commander_node.py +++ b/uavf_2024/gnc/commander_node.py @@ -1,22 +1,27 @@ -import std_msgs.msg -import mavros_msgs.msg -import mavros_msgs.srv +from concurrent.futures import Future, as_completed +from datetime import datetime +import time +import logging + +import numpy as np +from scipy.spatial.transform import Rotation as R + import rclpy import rclpy.node from rclpy.qos import * + +import mavros_msgs.msg +import mavros_msgs.srv import sensor_msgs.msg import geometry_msgs.msg import libuavf_2024.srv -from uavf_2024.imaging.imaging_types import ROSDetectionMessage, Target3D + from uavf_2024.gnc.util import pose_to_xy, convert_delta_gps_to_local_m, convert_local_m_to_delta_gps, read_payload_list, read_gpx_file, validate_points from uavf_2024.gnc.dropzone_planner import DropzonePlanner -from scipy.spatial.transform import Rotation as R -import time -import logging -from datetime import datetime -import numpy as np from uavf_2024.gnc.mission_messages import * +from uavf_2024.imaging import Perception, PoseProvider, Target3D + TAKEOFF_ALTITUDE = 20.0 class CommanderNode(rclpy.node.Node): @@ -85,10 +90,6 @@ def __init__(self, args): 'mavros/mission/reached', self.reached_cb, qos_profile) - - self.imaging_client = self.create_client( - libuavf_2024.srv.TakePicture, - '/imaging_service') self.got_home_pos = False self.home_position_sub = self.create_subscription( @@ -113,8 +114,9 @@ def __init__(self, args): self.dropzone_planner = DropzonePlanner(self, args.image_width_m, args.image_height_m) self.args = args + self.perception = Perception(PoseProvider(self)) + self.perception_futures: list[Future[list[Target3D]]] = [] self.call_imaging_at_wps = False - self.imaging_futures = [] self.turn_angle_limit = 170 @@ -141,7 +143,7 @@ def reached_cb(self, reached): self.do_imaging_call() def do_imaging_call(self): - self.imaging_futures.append(self.imaging_client.call_async(libuavf_2024.srv.TakePicture.Request())) + self.perception_futures.append(self.perception.get_image_down_async()) def got_pose_cb(self, pose): self.cur_pose = pose @@ -279,18 +281,22 @@ def release_payload(self): ) time.sleep(.05) - def gather_imaging_detections(self): - detections = [] + def gather_imaging_detections(self, timeout: float | None = None): + """ + Waits for all imaging detections to be completed and returns them. + """ + detections: list[Target3D] = [] self.log("Waiting for imaging detections.") - for future in self.imaging_futures: - while not future.done(): - pass - for detection in future.result().detections: - detections.append(Target3D.from_ros(ROSDetectionMessage(detection.timestamp, detection.x, detection.y, detection.z, - detection.shape_conf, detection.letter_conf, - detection.shape_color_conf, detection.letter_color_conf, detection.id))) - self.imaging_futures = [] - self.log(f"Successfully retrieved imaging detections: {detections}") + try: + for future in as_completed(self.perception_futures, timeout=timeout): + detections.extend(future.result()) + + self.log(f"Successfully retrieved imaging detections: {detections}") + except TimeoutError: + self.log("Timed out waiting for imaging detections.") + + self.perception_futures = [] + return detections def request_load_payload(self, payload): diff --git a/uavf_2024/imaging/__init__.py b/uavf_2024/imaging/__init__.py index 7faeea70..763872cc 100644 --- a/uavf_2024/imaging/__init__.py +++ b/uavf_2024/imaging/__init__.py @@ -7,3 +7,5 @@ from .tracker import TargetTracker from .imaging_types import ProbabilisticTargetDescriptor, CertainTargetDescriptor, Target3D, Image, BoundingBox from .drone_tracker import DroneTracker +from .pose import PoseProvider, PoseDatum +from .perception import Perception diff --git a/uavf_2024/imaging/camera_control.py b/uavf_2024/imaging/camera_control.py index af867772..a7e89b36 100644 --- a/uavf_2024/imaging/camera_control.py +++ b/uavf_2024/imaging/camera_control.py @@ -50,11 +50,11 @@ def __init__(self): self.image = None self.lock = threading.Lock() - def put(self, image: Image): + def put(self, image: Image[np.ndarray]): with self.lock: self.image = image - def get_latest(self) -> Image | None: + def get_latest(self) -> Image[np.ndarray] | None: with self.lock: return self.image @@ -205,7 +205,7 @@ def stop_recording(self): self.recording_thread.join() self.recording_thread = None - def get_latest_image(self) -> Image | None: + def get_latest_image(self) -> Image[np.ndarray] | None: """ Returns the latest Image (HWC) from the buffer. """ diff --git a/uavf_2024/imaging/imaging_types.py b/uavf_2024/imaging/imaging_types.py index 852fef7d..4e48b97c 100644 --- a/uavf_2024/imaging/imaging_types.py +++ b/uavf_2024/imaging/imaging_types.py @@ -359,7 +359,7 @@ def __init__( if array.shape[channels_index] != 3: raise ValueError("Image array must have 3 channels, got " + str(array[channels_index])) - self._array = array + self._array: _UnderlyingImageT = array def __getitem__(self, key): return self._array[key] @@ -386,7 +386,7 @@ def __mul__(self, other: number | _UnderlyingImageT) -> 'Image': """ return Image(self._array * other, self._dim_order) - def get_array(self): + def get_array(self) -> _UnderlyingImageT: return self._array def make_sub_image(self, x_coord, y_coord, width, height) -> 'Image': diff --git a/uavf_2024/imaging/perception.py b/uavf_2024/imaging/perception.py new file mode 100644 index 00000000..83ceb29e --- /dev/null +++ b/uavf_2024/imaging/perception.py @@ -0,0 +1,264 @@ +""" +Perception class to supercede ImagingNode by using Python-native async functionality +""" + + +from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor +import json +import logging +import os +from pathlib import Path +import time +from typing import Literal, Sequence + +import numpy as np +import cv2 + +from . import ImageProcessor, Camera, Localizer, PoseProvider, PoseDatum, Target3D, Image + + +LOGS_PATH = Path(f'logs/{time.strftime("%m-%d %Hh%Mm")}') + + +class Perception: + """ + External interface for the perception system. + """ + + # We need to use the singleton pattern because there can only be one Camera, + # and this class manages a process pool + _INSTANCE: "Perception | None" = None + + @staticmethod + def _get_existing_instance() -> "Perception | None": + """ + Get the existing instance of Perception, or None if it doesn't exist. + """ + return Perception._INSTANCE + + def __init__(self, pose_provider: PoseProvider, zoom_level: int = 3, logs_path: Path = LOGS_PATH): + """ + A PoseProvder must be injected because it depends on a MAVROS subscription. + This might not be neccessary in the future if we can get that data from MAVSDK. + """ + if Perception._INSTANCE is not None: + raise ValueError("Only one instance of Perception can be created.") + else: + Perception._INSTANCE = self + + print("Initializing Perception. Logging to", LOGS_PATH) + + self.logger = logging.getLogger('perception') + + self.zoom_level = zoom_level + self.logs_path = logs_path + + # Set up camera + self.camera = Camera(LOGS_PATH / 'camera') + self.camera.setAbsoluteZoom(zoom_level) + + self.image_processor = ImageProcessor(LOGS_PATH / 'image_processor') + + # There can only be one process because it uses the GPU + self.processor_pool = ProcessPoolExecutor(1) + + self.logging_pool = ThreadPoolExecutor(2) + + # This has to be injected because it needs to subscribe to the mavros topic + self.pose_provider = pose_provider + self.pose_provider.subscribe(self.cam_auto_point) + + def log(self, *args, **kwargs): + self.logger.info(*args, **kwargs) + + def cam_auto_point(self, current_pose: PoseDatum): + current_z = current_pose.position.z + + first_pose = self.pose_provider.get_first_datum() + if first_pose is None: + self.log("First datum not found trying to auto-point camera.") + return + + alt_from_gnd = current_z - first_pose.position.z + + # If pointed down and close to the ground, point forward + if(self.camera_state and alt_from_gnd < 3): #3 meters ~ 30 feet + self.camera.request_center() + self.camera_state = False + self.camera.stop_recording() + self.log(f"Crossing 3m down, pointing forward. Current altitude: {alt_from_gnd}") + # If pointed forward and altitude is higher, point down + elif(not self.camera_state and alt_from_gnd > 3): + self.camera.request_down() + self.camera_state = True + self.camera.start_recording() + self.log(f"Crossing 3m up, pointing down. Current altitude: {alt_from_gnd}") + else: + return + self.camera.request_autofocus() + + + def request_point(self, direction: Literal["down", "center"]) -> bool: + self.log(f"Received Point Camera Down Request: {direction}") + + match direction: + case "down": + success: bool = self.camera.request_down() + case "center": + success: bool = self.camera.request_center() + case _: + raise ValueError(f"Invalid direction: {direction}") + + self.camera.request_autofocus() + + return success + + def set_absolute_zoom(self, zoom_level: int) -> bool: + self.log(f"Received Set Zoom Request to {zoom_level}") + + success: bool = self.camera.setAbsoluteZoom(zoom_level) + self.camera.request_autofocus() + + self.zoom_level = zoom_level + return success + + def reset_log_dir(self): + new_logs_dir = Path('logs/{strftime("%m-%d %H:%M")}') + self.log(f"Starting new log directory at {new_logs_dir}") + os.makedirs(new_logs_dir, exist_ok = True) + + self.image_processor.reset_log_directory(new_logs_dir / 'image_processor') + self.camera.set_log_dir(new_logs_dir / 'camera') + self.pose_provider.set_log_dir(new_logs_dir / 'pose') + + def make_localizer(self): + focal_len = self.camera.getFocalLength() + + first_pose = self.pose_provider.get_first_datum() + if first_pose is None: + self.log("First datum does not exist trying to make Localizer.") + return + + localizer = Localizer.from_focal_length( + focal_len, + (1920, 1080), + (np.array([1,0,0]), np.array([0,-1, 0])), + 2, + first_pose.position.z - 0.15 # cube is about 15cm off ground + ) + return localizer + + def point_camera_down(self, wait: bool = True): + self.camera.request_down() + + if not wait: + self.log("Camera pointing down, but not waiting.") + return + + while abs(self.camera.getAttitude()[1] - -90) > 2: + self.log(f"Waiting to point down. Current angle: {self.camera.getAttitude()[1] } . " ) + time.sleep(0.1) + self.log("Camera pointed down") + + def get_image_down_async(self) -> Future[list[Target3D]]: + """ + Non-blocking implementation of the infrence pipeline, + calling get_image_down in a separate process. + """ + return self.processor_pool.submit(self.get_image_down) + + def _log_image_down( + self, + preds_3d: list[Target3D], + img: Image[np.ndarray], + timestamp: float, + pose: PoseDatum, + angles: Sequence[float], + cur_position_np: np.ndarray, + cur_rot_quat: np.ndarray + ): + logs_folder = self.image_processor.get_last_logs_path() + os.makedirs(logs_folder, exist_ok=True) + cv2.imwrite(f"{logs_folder}/image.png", img.get_array()) + + self.log(f"Logging inference frame at zoom level {self.zoom_level} to {logs_folder}") + + log_data = { + 'pose_time': pose.time_seconds, + 'image_time': timestamp, + 'drone_position': cur_position_np.tolist(), + 'drone_q': cur_rot_quat.tolist(), + 'gimbal_yaw': angles[0], + 'gimbal_pitch': angles[1], + 'gimbal_roll': angles[2], + 'zoom level': self.zoom_level, + 'preds_3d': [ + { + 'position': p.position.tolist(), + 'id': p.id, + } for p in preds_3d + ] + } + + json.dump(log_data, open(f"{logs_folder}/data.json", 'w+'), indent=4) + + def get_image_down(self) -> list[Target3D]: + """ + Blocking implementation of the infrence pipeline. + + Autofocus, then wait till cam points down, take pic, + + We want to take photo when the attitude is down only. + """ + self.log("Received Down Image Request") + self.camera.request_autofocus() + self.pose_provider.wait_for_data() + + if abs(self.camera.getAttitude()[1] - -90) > 5: # Allow 5 degrees of error (Arbitrary) + self.point_camera_down() + + #TODO: Figure out a way to detect when the gimbal is having an aneurism and figure out how to fix it or send msg to groundstation. + + # Take picture and grab relevant data + img = self.camera.get_latest_image() + if img is None: + self.log("Could not get image from Camera.") + return [] + timestamp = time.time() + self.log(f"Got image from Camera at time {timestamp}") + + localizer = self.make_localizer() + if localizer is None: + self.log("Could not get Localizer") + return [] + + detections = self.image_processor.process_image(img) + self.log(f"Finished image processing. Got {len(detections)} detections") + + # Get avg camera pose for the image + angles = self.camera.getAttitudeInterpolated(timestamp) + self.log(f"Got camera attitude: {angles}") + + # Get the pose measured 0.75 seconds before we received the image + # This is to account for the delay in the camera system, and was determined empirically + pose, is_timestamp_interpolated = self.pose_provider.get_interpolated(timestamp - 0.75) + if not is_timestamp_interpolated: + self.log("Couldn't interpolate pose.") + self.log(f"Got pose: {angles}") + + cur_position_np = np.array([pose.position.x, pose.position.y, pose.position.z]) + cur_rot_quat = pose.rotation.as_quat() # type: ignore + + world_orientation = self.camera.orientation_in_world_frame(pose.rotation, angles) + cam_pose = (cur_position_np, world_orientation) + + # Get 3D predictions + preds_3d = [localizer.prediction_to_coords(d, cam_pose) for d in detections] + + # Log data asynchronously + self.logging_pool.submit( + self._log_image_down, + preds_3d, img, timestamp, pose, angles, cur_position_np, cur_rot_quat + ) + + return preds_3d diff --git a/uavf_2024/imaging/pose.py b/uavf_2024/imaging/pose.py new file mode 100644 index 00000000..57c799d5 --- /dev/null +++ b/uavf_2024/imaging/pose.py @@ -0,0 +1,129 @@ +from bisect import bisect_left +import json +import time +from typing import Any, Callable, NamedTuple + +from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy +from geometry_msgs.msg import Point, PoseStamped +from scipy.spatial.transform import Rotation, Slerp + +from uavf_2024.async_utils import RosLoggingProvider + + +class PoseDatum(NamedTuple): + """ + Our representation of the pose data from the Cube. + """ + position: Point + rotation: Rotation + time_seconds: float + + def to_json(self): + return { + "position": [self.position.x, self.position.y, self.position.z], + "rotation": list(self.rotation.as_quat()), + "time_seconds": self.time_seconds + } + + +QOS_PROFILE = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + durability=DurabilityPolicy.VOLATILE, + depth = 1 +) + + +# TODO: Refactor to MAVSDK +class PoseProvider(RosLoggingProvider[PoseStamped, PoseDatum]): + def _subscribe_to_topic(self, action: Callable[[PoseStamped], Any]) -> None: + self.node.create_subscription( + PoseStamped, + '/mavros/local_position/pose', + action, + QOS_PROFILE + ) + + def log_to_file(self, item: PoseDatum): + logs_dir = self.get_logs_dir() + if logs_dir is None: + raise ValueError("Logging directory not given in initialization.") + + path = logs_dir / f"{item.time_seconds:.02f}.json" + with open(path, "w") as f: + json.dump(item.to_json(), f) + + def format_data(self, message: PoseStamped) -> PoseDatum: + quaternion = message.pose.orientation + + return PoseDatum( + position = message.pose.position, + rotation = Rotation.from_quat( + [quaternion.x, quaternion.y, quaternion.z, quaternion.w]), + time_seconds = message.header.stamp.sec + message.header.stamp.nanosec / 1e9 + ) + + def _interpolate_from_buffer(self, time_seconds: float, wait: bool = False) -> PoseDatum | None: + """ + Returns the pose datum interpolated between the two data points before and after the time given. + + If this interpolation is not possible because the pose is too old, returns the oldest pose. + If this interpolation is not possible because the poses are not old enough, wait until enough data is available if wait is enabled. + Otherwise, return the newest pose + If there is no pose available, wait if enabled. Otherwise, return None. + """ + if self._buffer.count == 0: + return None + + data = self._buffer.get_all_reversed() + + closest_idx = bisect_left([d.time_seconds for d in data], time_seconds) + + if closest_idx == 0: + return data[0] + + # Poll every 100ms + while closest_idx == len(data): + if not wait: + return data[closest_idx - 1] + + time.sleep(0.1) + data = self._buffer.get_all_reversed() + closest_idx = bisect_left([d.time_seconds for d in data], time_seconds) + + pt_before = data[closest_idx - 1] + pt_after = data[closest_idx] + + # Interpolate between the two points + proportion = (time_seconds - pt_before.time_seconds) / (pt_after.time_seconds - pt_before.time_seconds) + position = Point( + x = pt_before.position.x + proportion * (pt_after.position.x - pt_before.position.x), + y = pt_before.position.y + proportion * (pt_after.position.y - pt_before.position.y), + z = pt_before.position.z + proportion * (pt_after.position.z - pt_before.position.z) + ) + key_times = [pt_before.time_seconds, pt_after.time_seconds] + rotations = Rotation.concatenate([pt_before.rotation, pt_after.rotation]) + slerp = Slerp(key_times, rotations) + rotation = Rotation.from_quat(slerp([time_seconds]).as_quat()[0]) + # go to and from quaternion to force returned + # object to be single rotation + + return PoseDatum( + position = position, + rotation = rotation, + time_seconds = time_seconds + ) + + def get_interpolated(self, time_seconds: float) -> tuple[PoseDatum, bool]: + ''' + Gets the pose interpolated at `time_seconds`. Blocks if we don't have enough data in the queue + to interpolate yet, for a maximum of 5 seconds before giving up and returning the latest datum + regardless of its timestamp + + ''' + for _ in range(50): + interp_pose = self._interpolate_from_buffer(time_seconds, True) + if interp_pose is not None: + return (interp_pose, True) + else: + time.sleep(0.1) + return (self._buffer.get_fresh(), False) diff --git a/uavf_2024/utilization_logger.py b/uavf_2024/utilization_logger.py new file mode 100644 index 00000000..d33b071a --- /dev/null +++ b/uavf_2024/utilization_logger.py @@ -0,0 +1,85 @@ +from concurrent.futures import ThreadPoolExecutor +from itertools import chain +from pathlib import Path +import csv +import time + +from psutil import cpu_percent, virtual_memory, cpu_count + + +class UtilizationLogger: + """ + This class is responsible for logging CPU and memory utilization to a csv file. + + TODO: Log GPU utilization as well. + """ + def __init__(self, csv_path: Path, period: float = 0.2, start = True): + self.period = period + self.writer = UtilizationLogger._prepare_csv(csv_path) + + # This ensures that there's no race condition + self.pool = ThreadPoolExecutor(max_workers=1) + self.logging = start + + @staticmethod + def _prepare_csv(path: Path): + """ + Prepares the CSV file by creating it and writing the header dependigng on the number of CPUs. + """ + path.parent.mkdir(parents=True, exist_ok=True) + + writer = csv.writer(path.open('w')) + num_cpus = cpu_count() + writer.writerow( + chain(['time', 'memory', 'cpu'], (f'cpu{i}' for i in range(num_cpus))) + ) + + return writer + + def log(self): + """ + Logs the current CPU and memory utilization. + """ + timestamp = time.time() + memory = virtual_memory().percent + cpu_avg = cpu_percent() + cpus = cpu_percent(percpu=True) + + row = chain([timestamp, memory, cpu_avg], cpus) + self.writer.writerow(row) + + return row + + def start(self): + """ + Starts the logging process. + """ + self.logging = True + self.pool.submit(self._log_periodically) + + def stop(self): + """ + Stops the logging process. + """ + self.logging = False + + def _log_periodically(self): + """ + Logs the utilization periodically by submitting the log method to the thread pool, + then sleeping for the period. + + This blocks, so it should be run in a separate thread. + This is done in `start`, which submits it to the thread pool. + """ + while self.logging: + self.pool.submit(self.log) + time.sleep(self.period) + + +if __name__ == '__main__': + logger = UtilizationLogger(Path('utilization.csv')) + logger.start() + time.sleep(10) + logger.stop() + + print('Done!')