diff --git a/log_analysis/graph_pose.py b/log_analysis/graph_pose.py index 84cea9bd..f5b39dfa 100644 --- a/log_analysis/graph_pose.py +++ b/log_analysis/graph_pose.py @@ -19,7 +19,7 @@ img_timestamps = [] img_pixel_diff = [] -dir_name= Path("/home/ericp/uavf_2024/flight_logs/perception_logs_607/06-07 10:39") +dir_name= Path("/home/forge/ws/src/libuavf_2024/flight_logs/06-11 15:01") for fname in tqdm(sorted(os.listdir(dir_name / 'poses'))): @@ -39,32 +39,6 @@ y_rotations.append(euler[1]) z_rotations.append(euler[2]) -alt_timestamps = [] -alt_msl = [] -alt_local = [] -alt_rel = [] -alt_terrain = [] - -for fname in tqdm(sorted(os.listdir(dir_name / 'altitudes'))): - alt = json.load(open(dir_name / 'altitudes' / fname)) - timestamp = float(fname[:-5]) - msl = alt['amsl'] - local = alt['local'] - rel = alt['relative'] - terrain = alt['terrain'] - - alt_timestamps.append(timestamp) - alt_msl.append(msl) - alt_local.append(local) - alt_rel.append(rel) - alt_terrain.append(terrain) - -for (y_name, y_arr) in zip(["msl", "local", "relative", "terrain"], [alt_msl, alt_local, alt_rel, alt_terrain]): - plt.figure() - plt.plot(alt_timestamps, y_arr, label=y_name) - plt.title(f"{y_name} altitude vs time") - plt.savefig(f"{y_name}_vs_time.png") - for (y_name, y_arr) in zip(["x_positions", "y_positions", "z_positions", "x_rotations", "y_rotations", "z_rotations"], [x_positions, y_positions, z_positions, x_rotations, y_rotations, z_rotations]): plt.figure() plt.plot(pose_timestamps, y_arr, label= y_name) diff --git a/log_analysis/plot_timestamps.py b/log_analysis/plot_timestamps.py index ab491e3e..8b7c2b45 100644 --- a/log_analysis/plot_timestamps.py +++ b/log_analysis/plot_timestamps.py @@ -10,7 +10,7 @@ def get_images(images_dir: Path): first = None - for path in tqdm(sorted(images_dir.glob("*.jpg"))): + for path in tqdm(sorted(images_dir.glob("*.json"))): timestamp = float(path.stem) if first is None: first = timestamp @@ -22,15 +22,16 @@ def get_images(images_dir: Path): minute = 43 plt.title("Timestamp vs index") -cam_path = Path(f"/home/ericp/uavf_2024/flight_logs/perception_logs_608/06-08 01:{minute}/camera") +logs_path = Path("/media/forge/SANDISK/logs/06-11 21h01m/") +cam_path = logs_path / "camera" timestamps, imgs = get_images(cam_path) plt.scatter(range(len(timestamps)), timestamps, s=1) -img_process_path = Path(f"/home/ericp/uavf_2024/flight_logs/perception_logs_608/06-08 01:{minute}/image_processor") -for folder in sorted(img_process_path.glob("img_*")): - data = json.load(open(folder / "data.json")) - timestamp = data['image_time'] - plt.plot([0, len(timestamps)], [timestamp, timestamp], color='red', linewidth=1) +# img_process_path = logs_path / "image_processor" +# for folder in sorted(img_process_path.glob("img_*")): +# data = json.load(open(folder / "data.json")) +# timestamp = data['image_time'] +# plt.plot([0, len(timestamps)], [timestamp, timestamp], color='red', linewidth=1) plt.legend() plt.savefig("timestamps_vs_index.png") \ No newline at end of file diff --git a/scripts/imaging_node.py b/scripts/imaging_node.py index be554515..d12761a4 100755 --- a/scripts/imaging_node.py +++ b/scripts/imaging_node.py @@ -59,7 +59,7 @@ def to_json(self): ) -class PoseProvider(RosLoggingProvider[PoseStamped, PoseDatum]): +class PoseProvider(RosLoggingProvider[PoseStamped, PoseDatum]): def _subscribe_to_topic(self, action: Callable[[PoseStamped], Any]) -> None: self.node.create_subscription( PoseStamped, @@ -87,11 +87,14 @@ def format_data(self, message: PoseStamped) -> PoseDatum: time_seconds = message.header.stamp.sec + message.header.stamp.nanosec / 1e9 ) - def _interpolate_from_buffer(self, time_seconds: float) -> PoseDatum | None: + def _interpolate_from_buffer(self, time_seconds: float, wait: bool = False) -> PoseDatum | None: """ Returns the pose datum interpolated between the two data points before and after the time given. - If this interpolation is not possible, returns None. + If this interpolation is not possible because the pose is too old, returns the oldest pose. + If this interpolation is not possible because the poses are not old enough, wait until enough data is available if wait is enabled. + Otherwise, return the newest pose + If there is no pose available, wait if enabled. Otherwise, return None. """ if self._buffer.count == 0: return None @@ -103,6 +106,15 @@ def _interpolate_from_buffer(self, time_seconds: float) -> PoseDatum | None: if closest_idx == 0: return data[0] + # Poll every 100ms + while closest_idx == len(data): + if not wait: + return data[closest_idx - 1] + + time.sleep(0.1) + data = self._buffer.get_all_reversed() + closest_idx = bisect_left([d.time_seconds for d in data], time_seconds) + pt_before = data[closest_idx - 1] pt_after = data[closest_idx] @@ -134,7 +146,7 @@ def get_interpolated(self, time_seconds: float) -> tuple[PoseDatum, bool]: ''' for _ in range(50): - interp_pose = self._interpolate_from_buffer(time_seconds) + interp_pose = self._interpolate_from_buffer(time_seconds, True) if interp_pose is not None: return (interp_pose, True) else: @@ -147,7 +159,7 @@ class ImagingNode(Node): def __init__(self) -> None: # Initialize the node super().__init__('imaging_node') # type: ignore - self.logs_path = Path(f'logs/{time.strftime("%m-%d %H:%M")}') + self.logs_path = Path(f'/media/forge/SANDISK/logs/{time.strftime("%m-%d %Hh%Mm")}') self.camera = Camera(self.logs_path / "camera") self.zoom_level = 3 @@ -161,7 +173,7 @@ def __init__(self) -> None: self.log(f"Setting up imaging node ROS connections") # Subscriptions ---- - self.pose_provider = PoseProvider(self, self.logs_path / "poses") + self.pose_provider = PoseProvider(self, self.logs_path / "poses", 128) self.pose_provider.subscribe(self.cam_auto_point) # Only start the recording once (when the first pose comes in) diff --git a/tests/imaging/drone_tracker_tests.py b/tests/imaging/drone_tracker_tests.py index 0e1df5c3..fc17ff5b 100644 --- a/tests/imaging/drone_tracker_tests.py +++ b/tests/imaging/drone_tracker_tests.py @@ -1,196 +1,196 @@ -from uavf_2024.imaging.drone_tracker import DroneTracker -from uavf_2024.imaging.particle_filter import BoundingBox -from uavf_2024.imaging.camera_model import CameraModel -import unittest -import numpy as np -from numpy import cos, pi, sin, tan -from scipy.spatial.transform import Rotation as R -import matplotlib.pyplot as plt -import os -from tqdm import tqdm - -CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) - -class TestDroneTracker(unittest.TestCase): - def setUp(self): - self.resolution = (1920, 1080) - self.fov = 90 - self.focal_len_pixels = self.resolution[0] / (2 * tan(np.deg2rad(self.fov / 2))) - self.filter = DroneTracker(self.resolution, self.focal_len_pixels) - - def compute_measurement(self, cam_pose: tuple[np.ndarray, R], state: np.ndarray) -> BoundingBox: - ''' - `state` is the state of the track, which is a 7-element array - ''' - # If behind the camera, return a box with 0 area - if np.dot(state[:3] - cam_pose[0], cam_pose[1].apply([0, 0, 1])) < 0: - return BoundingBox(0, 0, 0, 0) - - cam = CameraModel(self.focal_len_pixels, - [self.resolution[0] / 2, self.resolution[1] / 2], - cam_pose[1].as_matrix(), - cam_pose[0].reshape(3, 1)) - - state_position = state[:3] - state_radius = state[-1] - - ray_to_center = state_position - cam_pose[0] - - # Monte Carlo to find the circumscribed rectangle around the sphere's projection into the camera - n_samples = 100 - - # Sample points on the sphere - random_vector = np.random.randn(3, n_samples) - random_vector -= np.dot(random_vector.T, ray_to_center) * np.repeat([ray_to_center / np.linalg.norm(ray_to_center)], n_samples, axis=0).T - random_vector = random_vector / np.linalg.norm(random_vector, axis=0) * state_radius - - # Project points into the camera - projected_points = cam.project(state_position.reshape((3, 1)) + random_vector) - x_min = np.min(projected_points[0]) - x_max = np.max(projected_points[0]) - y_min = np.min(projected_points[1]) - y_max = np.max(projected_points[1]) - - return BoundingBox((x_min + x_max) / 2, (y_min + y_max) / 2, x_max - x_min, y_max - y_min) - - - def save_visualization(self, test_case_name, fig, vis_num): - '''Given a test case name and a number for the visual, saves a figure to a particular folder path''' - folder_path = os.path.join(CURRENT_DIR, 'visualizations', 'drone_tracker', test_case_name) - os.makedirs(folder_path, exist_ok=True) - file_path = os.path.join(folder_path, f'{test_case_name}_visual_{vis_num}.png') - fig.savefig(file_path) - plt.close(fig) +# from uavf_2024.imaging.drone_tracker import DroneTracker +# from uavf_2024.imaging.particle_filter import BoundingBox +# from uavf_2024.imaging.camera_model import CameraModel +# import unittest +# import numpy as np +# from numpy import cos, pi, sin, tan +# from scipy.spatial.transform import Rotation as R +# import matplotlib.pyplot as plt +# import os +# from tqdm import tqdm + +# CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# class TestDroneTracker(unittest.TestCase): +# def setUp(self): +# self.resolution = (1920, 1080) +# self.fov = 90 +# self.focal_len_pixels = self.resolution[0] / (2 * tan(np.deg2rad(self.fov / 2))) +# self.filter = DroneTracker(self.resolution, self.focal_len_pixels) + +# def compute_measurement(self, cam_pose: tuple[np.ndarray, R], state: np.ndarray) -> BoundingBox: +# ''' +# `state` is the state of the track, which is a 7-element array +# ''' +# # If behind the camera, return a box with 0 area +# if np.dot(state[:3] - cam_pose[0], cam_pose[1].apply([0, 0, 1])) < 0: +# return BoundingBox(0, 0, 0, 0) + +# cam = CameraModel(self.focal_len_pixels, +# [self.resolution[0] / 2, self.resolution[1] / 2], +# cam_pose[1].as_matrix(), +# cam_pose[0].reshape(3, 1)) + +# state_position = state[:3] +# state_radius = state[-1] + +# ray_to_center = state_position - cam_pose[0] + +# # Monte Carlo to find the circumscribed rectangle around the sphere's projection into the camera +# n_samples = 100 + +# # Sample points on the sphere +# random_vector = np.random.randn(3, n_samples) +# random_vector -= np.dot(random_vector.T, ray_to_center) * np.repeat([ray_to_center / np.linalg.norm(ray_to_center)], n_samples, axis=0).T +# random_vector = random_vector / np.linalg.norm(random_vector, axis=0) * state_radius + +# # Project points into the camera +# projected_points = cam.project(state_position.reshape((3, 1)) + random_vector) +# x_min = np.min(projected_points[0]) +# x_max = np.max(projected_points[0]) +# y_min = np.min(projected_points[1]) +# y_max = np.max(projected_points[1]) + +# return BoundingBox((x_min + x_max) / 2, (y_min + y_max) / 2, x_max - x_min, y_max - y_min) + + +# def save_visualization(self, test_case_name, fig, vis_num): +# '''Given a test case name and a number for the visual, saves a figure to a particular folder path''' +# folder_path = os.path.join(CURRENT_DIR, 'visualizations', 'drone_tracker', test_case_name) +# os.makedirs(folder_path, exist_ok=True) +# file_path = os.path.join(folder_path, f'{test_case_name}_visual_{vis_num}.png') +# fig.savefig(file_path) +# plt.close(fig) - def moving_target_path(self, cam_positions: np.ndarray, cam_rotations: np.ndarray, target_positions: np.ndarray, test_case_name: str): - ''' - Simulates and visualizes tracking of a moving target by a drone. - - Parameters: - cam_positions (np.ndarray): Camera positions at each time step. - cam_rotations (np.ndarray): Camera orientations at each time step. - target_positions (np.ndarray): Target states (position, velocity, radius) at each time step. - test_case_name (str): Name for saving visualizations. - - This function follows the following process: - 1. Initialize plot with camera and target positions. - 2. For each time step: - a. Compute target bounding box from camera pose. - b. Predict and update filter state. - c. Visualize and save filter state and covariances. - 3. Plot and save variance of filter state estimates over time. - 4. Assert filter convergence to expected final target position. - ''' - - n_cam_samples = len(cam_positions) - - fig, ax = plt.subplots() - ax.set_xlim(-10, 10) - ax.set_ylim(-10, 10) - for step, (cam_pos, cam_rot) in enumerate(zip(cam_positions, cam_rotations)): - look_direction = cam_rot.apply([0, 0, 1]) - ax.scatter(cam_pos[0], cam_pos[2], label='camera position' if step == 0 else "") - ax.plot([cam_pos[0], cam_pos[0] + look_direction[0]], [cam_pos[2], cam_pos[2] + look_direction[2]], 'g', label='camera direction' if step == 0 else "") - ax.plot([target_positions[step][0]], [target_positions[step][2]], 'ro', label='actual position' if step == 0 else "") - ax.legend() - self.save_visualization(test_case_name, fig, 1) - - covariances = [] - fig_bounds = 40 - - for step, (cam_pos, cam_rot) in enumerate(tqdm(zip(cam_positions, cam_rotations))): - target_state = target_positions[step] # Current state of the target - - bbox = self.compute_measurement((cam_pos, cam_rot), target_state) - self.filter.predict(0.5) - self.filter.update((cam_pos, cam_rot), [bbox]) - - fig = self.filter.tracks[0].filter.visualize(fig_bounds) - ax = fig.axes[0] - look_direction = cam_rot.apply([0, 0, 1]) - ax.plot([cam_pos[0], cam_pos[0] + look_direction[0]], [cam_pos[2], cam_pos[2] + look_direction[2]], 'g', label='camera (our drone)') - ax.plot([target_positions[step][0]], [target_positions[step][2]], 'ro', label='actual position') +# def moving_target_path(self, cam_positions: np.ndarray, cam_rotations: np.ndarray, target_positions: np.ndarray, test_case_name: str): +# ''' +# Simulates and visualizes tracking of a moving target by a drone. + +# Parameters: +# cam_positions (np.ndarray): Camera positions at each time step. +# cam_rotations (np.ndarray): Camera orientations at each time step. +# target_positions (np.ndarray): Target states (position, velocity, radius) at each time step. +# test_case_name (str): Name for saving visualizations. + +# This function follows the following process: +# 1. Initialize plot with camera and target positions. +# 2. For each time step: +# a. Compute target bounding box from camera pose. +# b. Predict and update filter state. +# c. Visualize and save filter state and covariances. +# 3. Plot and save variance of filter state estimates over time. +# 4. Assert filter convergence to expected final target position. +# ''' + +# n_cam_samples = len(cam_positions) + +# fig, ax = plt.subplots() +# ax.set_xlim(-10, 10) +# ax.set_ylim(-10, 10) +# for step, (cam_pos, cam_rot) in enumerate(zip(cam_positions, cam_rotations)): +# look_direction = cam_rot.apply([0, 0, 1]) +# ax.scatter(cam_pos[0], cam_pos[2], label='camera position' if step == 0 else "") +# ax.plot([cam_pos[0], cam_pos[0] + look_direction[0]], [cam_pos[2], cam_pos[2] + look_direction[2]], 'g', label='camera direction' if step == 0 else "") +# ax.plot([target_positions[step][0]], [target_positions[step][2]], 'ro', label='actual position' if step == 0 else "") +# ax.legend() +# self.save_visualization(test_case_name, fig, 1) + +# covariances = [] +# fig_bounds = 40 + +# for step, (cam_pos, cam_rot) in enumerate(tqdm(zip(cam_positions, cam_rotations))): +# target_state = target_positions[step] # Current state of the target + +# bbox = self.compute_measurement((cam_pos, cam_rot), target_state) +# self.filter.predict(0.5) +# self.filter.update((cam_pos, cam_rot), [bbox]) + +# fig = self.filter.tracks[0].filter.visualize(fig_bounds) +# ax = fig.axes[0] +# look_direction = cam_rot.apply([0, 0, 1]) +# ax.plot([cam_pos[0], cam_pos[0] + look_direction[0]], [cam_pos[2], cam_pos[2] + look_direction[2]], 'g', label='camera (our drone)') +# ax.plot([target_positions[step][0]], [target_positions[step][2]], 'ro', label='actual position') - mean = self.filter.tracks[0].filter.mean() - cov = np.diag(self.filter.tracks[0].filter.covariance()) - ax.plot([mean[0]], [mean[2]], 'yo', label='estimated position') - fig.legend() - self.save_visualization(test_case_name, fig, step + 2) - covariances.append(np.diag(self.filter.tracks[0].filter.covariance())) +# mean = self.filter.tracks[0].filter.mean() +# cov = np.diag(self.filter.tracks[0].filter.covariance()) +# ax.plot([mean[0]], [mean[2]], 'yo', label='estimated position') +# fig.legend() +# self.save_visualization(test_case_name, fig, step + 2) +# covariances.append(np.diag(self.filter.tracks[0].filter.covariance())) - fig, ax = plt.subplots() - labels = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'r'] - for i in range(len(covariances[0])): - ax.plot([c[i] for c in covariances], label=labels[i]) - ax.legend() - ax.set_title("Variance vs timestep") - self.save_visualization(test_case_name, fig, len(covariances) + 2) +# fig, ax = plt.subplots() +# labels = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'r'] +# for i in range(len(covariances[0])): +# ax.plot([c[i] for c in covariances], label=labels[i]) +# ax.legend() +# ax.set_title("Variance vs timestep") +# self.save_visualization(test_case_name, fig, len(covariances) + 2) - self.assertTrue(len(self.filter.tracks) == 1) - final_state = self.filter.tracks[0].filter.mean() - expected_final_position = target_positions[-1][:3] - self.assertTrue(np.allclose(final_state[:3], expected_final_position, atol=0.1), f"filter converged to {final_state}") +# self.assertTrue(len(self.filter.tracks) == 1) +# final_state = self.filter.tracks[0].filter.mean() +# expected_final_position = target_positions[-1][:3] +# self.assertTrue(np.allclose(final_state[:3], expected_final_position, atol=0.1), f"filter converged to {final_state}") - def test_stationary_target(self): - n_cam_samples = 20 - cam_pos_radius = 10 +# def test_stationary_target(self): +# n_cam_samples = 20 +# cam_pos_radius = 10 - cam_positions = [cam_pos_radius * np.array([sin(2 * pi * i / n_cam_samples), 0, -cos(2 * pi * i / n_cam_samples)]) for i in range(n_cam_samples)] - cam_rotations = [R.from_euler('xyz', [0, -2 * pi * i / n_cam_samples, 0]) for i in range(n_cam_samples)] +# cam_positions = [cam_pos_radius * np.array([sin(2 * pi * i / n_cam_samples), 0, -cos(2 * pi * i / n_cam_samples)]) for i in range(n_cam_samples)] +# cam_rotations = [R.from_euler('xyz', [0, -2 * pi * i / n_cam_samples, 0]) for i in range(n_cam_samples)] - target_positions = np.array([[0, 0, 0, 0, 0, 0, 0.35]] * n_cam_samples) # Stationary target +# target_positions = np.array([[0, 0, 0, 0, 0, 0, 0.35]] * n_cam_samples) # Stationary target - self.moving_target_path(np.array(cam_positions), cam_rotations, target_positions, 'test_stationary_target') +# self.moving_target_path(np.array(cam_positions), cam_rotations, target_positions, 'test_stationary_target') - def test_moving_target_straight_line(self): - cam_positions = np.array([[i, 0, 10] for i in range(20)]) - cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight +# def test_moving_target_straight_line(self): +# cam_positions = np.array([[i, 0, 10] for i in range(20)]) +# cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight - target_positions = np.array([[0, i, 10, 0, 1, 0, 0.35] for i in range(20)]) +# target_positions = np.array([[0, i, 10, 0, 1, 0, 0.35] for i in range(20)]) - self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_straight_line') +# self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_straight_line') - def test_moving_target_circling_camera(self): - cam_positions = np.array([[0, 0, 10]] * 20) # Stationary camera - cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight +# def test_moving_target_circling_camera(self): +# cam_positions = np.array([[0, 0, 10]] * 20) # Stationary camera +# cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight - angles = np.linspace(0, 2 * np.pi, 20) - target_radius = 5 - target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10, - -target_radius * np.sin(angle), target_radius * np.cos(angle), 0, 0.35] for angle in angles]) +# angles = np.linspace(0, 2 * np.pi, 20) +# target_radius = 5 +# target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10, +# -target_radius * np.sin(angle), target_radius * np.cos(angle), 0, 0.35] for angle in angles]) - self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_circling_camera') +# self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_circling_camera') - def test_moving_target_spiral_around_camera(self): - cam_positions = np.array([[0, 0, 10]] * 20) # Stationary camera - cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight - - # Target moves in a spiral path around the camera - num_steps = 20 - angles = np.linspace(0, 4 * np.pi, num_steps) # Two full circles - target_radius = 5 - z_step = 0.5 - target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10 + z_step * i, - -target_radius * np.sin(angle), target_radius * np.cos(angle), z_step, 0.35] for i, angle in enumerate(angles)]) - - self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_spiral_around_camera') +# def test_moving_target_spiral_around_camera(self): +# cam_positions = np.array([[0, 0, 10]] * 20) # Stationary camera +# cam_rotations = [R.from_euler('xyz', [0, 0, 0]) for _ in range(20)] # Camera looking straight + +# # Target moves in a spiral path around the camera +# num_steps = 20 +# angles = np.linspace(0, 4 * np.pi, num_steps) # Two full circles +# target_radius = 5 +# z_step = 0.5 +# target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10 + z_step * i, +# -target_radius * np.sin(angle), target_radius * np.cos(angle), z_step, 0.35] for i, angle in enumerate(angles)]) + +# self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_spiral_around_camera') - def test_moving_target_spiral_with_camera_movement(self): - # Target moves in a spiral path around the camera - num_steps = 20 - angles = np.linspace(0, 4 * np.pi, num_steps) # Two full circles - target_radius = 5 - z_step = 0.5 - target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10 + z_step * i, - -target_radius * np.sin(angle), target_radius * np.cos(angle), z_step, 0.35] for i, angle in enumerate(angles)]) - - # Camera starts behind the target and speeds up to catch up - cam_positions = np.array([[target_radius * np.cos(angle - 0.2), target_radius * np.sin(angle - 0.2), 10 + z_step * i] for i, angle in enumerate(angles)]) - cam_rotations = [R.from_euler('xyz', [0, 0, angle - 0.2]) for angle in angles] - - self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_spiral_with_camera_movement') - -if __name__ == '__main__': - unittest.main() \ No newline at end of file +# def test_moving_target_spiral_with_camera_movement(self): +# # Target moves in a spiral path around the camera +# num_steps = 20 +# angles = np.linspace(0, 4 * np.pi, num_steps) # Two full circles +# target_radius = 5 +# z_step = 0.5 +# target_positions = np.array([[target_radius * np.cos(angle), target_radius * np.sin(angle), 10 + z_step * i, +# -target_radius * np.sin(angle), target_radius * np.cos(angle), z_step, 0.35] for i, angle in enumerate(angles)]) + +# # Camera starts behind the target and speeds up to catch up +# cam_positions = np.array([[target_radius * np.cos(angle - 0.2), target_radius * np.sin(angle - 0.2), 10 + z_step * i] for i, angle in enumerate(angles)]) +# cam_rotations = [R.from_euler('xyz', [0, 0, angle - 0.2]) for angle in angles] + +# self.moving_target_path(cam_positions, cam_rotations, target_positions, 'test_moving_target_spiral_with_camera_movement') + +# if __name__ == '__main__': +# unittest.main() \ No newline at end of file diff --git a/uavf_2024/gnc/commander_node.py b/uavf_2024/gnc/commander_node.py index 6f75b0e3..fd90d000 100644 --- a/uavf_2024/gnc/commander_node.py +++ b/uavf_2024/gnc/commander_node.py @@ -120,20 +120,22 @@ def __init__(self, args): self.got_home_local_pos = False self.home_local_pos = None + self.last_imaging_time = None def log(self, *args, **kwargs): logging.info(*args, **kwargs) def got_state_cb(self, state): self.cur_state = state + timestamp = time.time() + if self.call_imaging_at_wps and (self.last_imaging_time is None or timestamp - self.last_imaging_time < 0.3): + self.do_imaging_call() + self.last_imaging_time = timestamp def reached_cb(self, reached): if reached.wp_seq > self.last_wp_seq: self.log(f"Reached waypoint {reached.wp_seq}") self.last_wp_seq = reached.wp_seq - - if self.call_imaging_at_wps: - self.do_imaging_call() def do_imaging_call(self): self.imaging_futures.append(self.imaging_client.call_async(libuavf_2024.srv.TakePicture.Request())) diff --git a/uavf_2024/gnc/data/greatpark.gpx b/uavf_2024/gnc/data/greatpark.gpx index 748e3f83..fa271af8 100644 --- a/uavf_2024/gnc/data/greatpark.gpx +++ b/uavf_2024/gnc/data/greatpark.gpx @@ -34,7 +34,7 @@ Great Park Mission - + 20.0 diff --git a/uavf_2024/imaging/camera_control.py b/uavf_2024/imaging/camera_control.py index fe55c678..461b9e3a 100644 --- a/uavf_2024/imaging/camera_control.py +++ b/uavf_2024/imaging/camera_control.py @@ -26,7 +26,7 @@ class LogBuffer: Buffer for logging data implementing a Lock for multithreading. """ def __init__(self): - self.log_data = deque(maxlen=512) # ~1gb of data + self.log_data = deque(maxlen=128) self.lock = threading.Lock() def append(self, datum : LogType): @@ -123,7 +123,7 @@ def set_log_dir(self, log_dir: str | Path): self.log_dir = Path(log_dir) def _logging_worker(self): - while self.logging: + while self.logging and self.log_dir: log_data = self.log_buffer.pop_data() if log_data is None: if not self.recording: # finish logging if recording is done @@ -209,7 +209,7 @@ def get_latest_image(self) -> Image | None: def requestAbsolutePosition(self, yaw: float, pitch: float): return self.cam.requestAbsolutePosition(yaw, pitch) - def requestGimbalSpeed(self, yaw_speed: float, pitch_speed: float): + def requestGimbalSpeed(self, yaw_speed: int, pitch_speed: int): return self.cam.requestGimbalSpeed(yaw_speed, pitch_speed) def request_center(self): @@ -282,5 +282,5 @@ def orientation_in_world_frame(drone_rot: Rotation, cam_attitude: np.ndarray) -> cam = Camera() out = cam.get_latest_image() # matplotlib.image.imsave("sample_frame.png",out.get_array().transpose(2,1,0)) - out.save("sample_frame.png") + if out != None: out.save("sample_frame.png") cam.disconnect() \ No newline at end of file