Skip to content

Commit

Permalink
Multi-threaded Camera logging refactor (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
MinhxNguyen7 authored Jun 20, 2024
1 parent c21140c commit b32a247
Showing 1 changed file with 45 additions and 40 deletions.
85 changes: 45 additions & 40 deletions uavf_2024/imaging/camera_control.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import threading
import time
Expand Down Expand Up @@ -88,16 +89,45 @@ def get_interpolated(self, timestamp: float):
'attitude': attitude,
'zoom': zoom
}


class CameraLogger:
"""
Class encapsulating multithreaded logging of images and metadata.
The thread pool will be automatically destroyed when the CameraLogger object is destroyed.
"""
def __init__(self, log_dir: str | Path, max_threads: int = 2):
self.log_dir = Path(log_dir)
self._prep_log_dir(self.log_dir)

self.pool = ThreadPoolExecutor(max_workers=max_threads)

@staticmethod
def _prep_log_dir(log_dir: Path):
if not log_dir.exists():
log_dir.mkdir(parents=True, exist_ok=True)

def log_async(self, image: Image, metadata: dict, timestamp: float):
"""
Asynchronously logs the image and metadata to the log directory.
"""
self.pool.submit(self._log_to_file, image, metadata, timestamp)

def _log_to_file(self, image: Image, metadata: dict, timestamp: float):
image.save(self.log_dir / f"{timestamp}.jpg")
json.dump(
metadata,
open(self.log_dir / f"{timestamp}.json", 'w')
)


class Camera:
def __init__(self, log_dir: str | Path | None = None):
"""
Currently starts recording and logging as soon as constructed.
This should be changed to after takeoff.
"""
self.log_dir = Path(log_dir) if log_dir is not None else None
if self.log_dir:
self._prep_log_dir(self.log_dir)

self.cam = SIYISDK(server_ip = "192.168.144.25", port= 37260,debug=False)
self.stream = SIYISTREAM(server_ip = "192.168.144.25", port = 8554,debug=False)
Expand All @@ -107,36 +137,19 @@ def __init__(self, log_dir: str | Path | None = None):

# Buffer for taking the latest image, logging it, and returning it in get_latest_image
self.buffer = ImageBuffer()
self.log_buffer = LogBuffer()
self.logging_thread: threading.Thread | None = None
self.logging = False

self.recording_thread: threading.Thread | None = None
self.recording = False

# Controls whether images and data are submitted to the `threaded_logger`
if log_dir:
self.threaded_logger = CameraLogger(Path(log_dir))
self.logging = True

self.metadata_buffer = MetadataBuffer()

@staticmethod
def _prep_log_dir(log_dir: Path):
if not log_dir.exists():
log_dir.mkdir(parents=True, exist_ok=True)

def set_log_dir(self, log_dir: str | Path):
self.log_dir = Path(log_dir)

def _logging_worker(self):
while self.logging and self.log_dir:
log_data = self.log_buffer.pop_data()
if log_data is None:
if not self.recording: # finish logging if recording is done
self.logging = False
break
time.sleep(0.1)
continue
image, metadata, timestamp = log_data.get_data()
image.save(self.log_dir / f"{timestamp}.jpg")
json.dump(
metadata,
open(self.log_dir / f"{timestamp}.json", 'w')
)

def _recording_worker(self):
"""
Expand All @@ -161,19 +174,18 @@ def _recording_worker(self):

image = Image(img_arr, HWC)
self.buffer.put(image)

metadata = {
"attitude": attitude_position,
"zoom": zoom,
"time_seconds": attitude_stamp
}
self.metadata_buffer.append(metadata) # allows for interpolation of attitude and zoom
log_data = LogType(image, metadata, img_stamp)
self.log_buffer.append(log_data)

if self.logging:
self.threaded_logger.log_async(image, metadata, img_stamp)

def start_recording(self):
"""
Currently called in __init__, but this should be changed to being called when we're in the air.
"""
if self.recording:
return
self.recording_thread = threading.Thread(target=self._recording_worker)
Expand All @@ -182,17 +194,10 @@ def start_recording(self):
self.start_logging()

def start_logging(self):
if self.logging or not self.log_dir or not self.recording:
return
self.logging_thread = threading.Thread(target=self._logging_worker)
self.logging = True
self.logging_thread.start()

def stop_logging(self):
if self.logging_thread:
self.logging = False
self.logging_thread.join()
self.logging_thread = None
self.logging = False

def stop_recording(self):
if self.recording_thread:
Expand Down

0 comments on commit b32a247

Please sign in to comment.