Skip to content

Commit

Permalink
fix miscellaneous bugs
Browse files Browse the repository at this point in the history
make pose interpolation able to timeout

use correct exception type to catch futures timing out

perception and pose provider now log correctly from ROS

target3d now has a shorter __repr__
  • Loading branch information
SuvirBajaj committed Jun 24, 2024
1 parent b85ca78 commit e5dc6aa
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 20 deletions.
14 changes: 9 additions & 5 deletions scripts/run_perception.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from uavf_2024.imaging import Perception, PoseProvider
from concurrent.futures import as_completed
from concurrent.futures import as_completed, TimeoutError
import traceback
import time

class PerceptionMinimalNode(Node):
''' Receives input from every ongoing process.
Expand All @@ -13,32 +15,34 @@ class PerceptionMinimalNode(Node):

def __init__(self) -> None:
super().__init__('perception_test_node')
self.perception = Perception(PoseProvider(self))
self.perception = Perception(PoseProvider(self, logger=self.get_logger()), logger=self.get_logger())
self.timer_period = 1.0 # seconds
self.create_timer(self.timer_period, self.timer_cb)
self.perception_futures = []
self.time_alive = 0
self.done = False

def log(self, msg):
self.get_logger().info(msg)

def timer_cb(self):
self.log("Running timer callback")
self.perception_futures.append(self.perception.get_image_down_async())
self.time_alive += 1
if self.time_alive > 10:
if self.time_alive >= 10:
self.log('Shutting down...')
self.log(f"results: {self.collect_results()}")
self.destroy_node()
rclpy.shutdown()

def collect_results(self):
detections = []
timeout = 2.0
timeout = 1.0
try:
for future in as_completed(self.perception_futures, timeout=timeout):
detections.extend(future.result())

self.log(f"Successfully retrieved imaging detections: {detections}")
self.log(f"Successfully retrieved imaging detections")
except TimeoutError:
self.log("Timed out waiting for imaging detections.")

Expand Down
8 changes: 6 additions & 2 deletions uavf_2024/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def __init__(
node_context: Node,
logs_dir: str | os.PathLike | Path | None = None,
buffer_size = 64,
logger_name: str | None = None
logger_name: str | None = None,
logger = None
):
"""
Parameters:
Expand All @@ -157,6 +158,9 @@ def __init__(
self.logger = logging.getLogger("RosLoggingProvider" + str(__class__.LOGGER_INDEX))
__class__.LOGGER_INDEX += 1

if logger is not None:
self.logger = logger

self._first_value: LoggingBufferT | None = None

self._subscribers: Subscriptions[LoggingBufferT] = Subscriptions()
Expand All @@ -175,7 +179,7 @@ def __init__(
self.log(f"Finished intializing RosLoggingProvider. Logging to {self._logs_dir}")

def log(self, message, level = logging.INFO):
self.logger.log(level, message)
self.logger.log(message, level)

def set_log_dir(self, new_dir: str | os.PathLike | Path):
self._logs_dir = Path(new_dir)
Expand Down
1 change: 1 addition & 0 deletions uavf_2024/imaging/camera_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def stop_logging(self):

def stop_recording(self):
if self.recording_thread:
self.stop_logging()
self.recording = False
self.recording_thread.join()
self.recording_thread = None
Expand Down
6 changes: 6 additions & 0 deletions uavf_2024/imaging/imaging_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ def from_ros(msg: ROSDetectionMessage) -> Target3D:
),
msg.id
)
def __repr__(self):
certain = self.descriptor.collapse_to_certain()
descriptor_str = f'{certain.shape_col} ({max(self.descriptor.shape_col_probs)}) {certain.shape} ({max(self.descriptor.shape_probs)}) {certain.letter_col} ({max(self.descriptor.letter_col_probs)}) {certain.letter} ({max(self.descriptor.letter_probs)})'
position_str = f'({self.position[0]:.02f}, {self.position[1]:.02f}, {self.position[2]:.02f})'

return f'{descriptor_str} at {position_str} with id {self.id}'

@dataclass
class ROSDetectionMessage:
Expand Down
7 changes: 4 additions & 3 deletions uavf_2024/imaging/perception.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _get_existing_instance() -> "Perception | None":
"""
return Perception._INSTANCE

def __init__(self, pose_provider: PoseProvider, zoom_level: int = 3, logs_path: Path = LOGS_PATH):
def __init__(self, pose_provider: PoseProvider, zoom_level: int = 3, logs_path: Path = LOGS_PATH, logger = None):
"""
A PoseProvder must be injected because it depends on a MAVROS subscription.
This might not be neccessary in the future if we can get that data from MAVSDK.
Expand All @@ -48,20 +48,21 @@ def __init__(self, pose_provider: PoseProvider, zoom_level: int = 3, logs_path:

print("Initializing Perception. Logging to", LOGS_PATH)

self.logger = logging.getLogger('perception')
self.logger = logging.getLogger('perception') if logger is None else logger

self.zoom_level = zoom_level
self.logs_path = logs_path

# Set up camera
self.camera = Camera(LOGS_PATH / 'camera')
self.camera.setAbsoluteZoom(zoom_level)
self.camera.start_recording()
self.camera_state = False

self.image_processor = ImageProcessor(LOGS_PATH / 'image_processor')

# There can only be one process because it uses the GPU
self.processor_pool = ProcessPoolExecutor(1)
self.processor_pool = ThreadPoolExecutor(1)

self.logging_pool = ThreadPoolExecutor(2)

Expand Down
26 changes: 16 additions & 10 deletions uavf_2024/imaging/pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def format_data(self, message: PoseStamped) -> PoseDatum:
time_seconds = message.header.stamp.sec + message.header.stamp.nanosec / 1e9
)

def _interpolate_from_buffer(self, time_seconds: float, wait: bool = False) -> PoseDatum | None:
def _interpolate_from_buffer(self, time_seconds: float, timeout: float = 0.5) -> PoseDatum | None:
"""
Returns the pose datum interpolated between the two data points before and after the time given.
If this interpolation is not possible because the pose is too old, returns the oldest pose.
If this interpolation is not possible because the poses are not old enough, wait until enough data is available if wait is enabled.
Otherwise, return the newest pose
If there is no pose available, wait if enabled. Otherwise, return None.
If timeout is None, doesn't wait for new data to interpolate, just returns latest one. Otherwise, waits for new data, returning None after the timeout
"""
if self._buffer.count == 0:
return None
Expand All @@ -81,12 +83,19 @@ def _interpolate_from_buffer(self, time_seconds: float, wait: bool = False) -> P
if closest_idx == 0:
return data[0]

# Poll every 100ms
# Poll every 100ms, timeout after 1 sec (configurable)
time_waited = 0
while closest_idx == len(data):
if not wait:
if timeout is None:
return data[closest_idx - 1]

time.sleep(0.1)
poll_period = 0.1
time.sleep(poll_period)
time_waited += poll_period
self.log("Waiting for new pose datum")
if time_waited > timeout:
self.log("Pose interpolation timed out. Returning latest.")
return None
data = self._buffer.get_all_reversed()
closest_idx = bisect_left([d.time_seconds for d in data], time_seconds)

Expand Down Expand Up @@ -120,10 +129,7 @@ def get_interpolated(self, time_seconds: float) -> tuple[PoseDatum, bool]:
regardless of its timestamp
'''
for _ in range(50):
interp_pose = self._interpolate_from_buffer(time_seconds, True)
if interp_pose is not None:
return (interp_pose, True)
else:
time.sleep(0.1)
interp_pose = self._interpolate_from_buffer(time_seconds, True)
if interp_pose is not None:
return (interp_pose, True)
return (self._buffer.get_fresh(), False)

0 comments on commit e5dc6aa

Please sign in to comment.