From 2acfb8b5c602cde408a0ebc1a97095e09dfaa148 Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Thu, 6 Feb 2025 20:24:54 +0200 Subject: [PATCH 1/8] refactor: add type annotations and improve type safety Signed-off-by: KarimAziev --- picamera2/job.py | 41 +++++++--- picamera2/picamera2.py | 95 +++++++++++++++++++--- picamera2/request.py | 177 +++++++++++++++++++++++++++-------------- pyrightconfig.json | 2 +- 4 files changed, 231 insertions(+), 84 deletions(-) diff --git a/picamera2/job.py b/picamera2/job.py index ad5fa399..ffa16d04 100644 --- a/picamera2/job.py +++ b/picamera2/job.py @@ -1,9 +1,13 @@ -from concurrent.futures import CancelledError, Future +from typing import Callable, List, Optional, Tuple, TypeVar, Generic +from concurrent.futures import Future, CancelledError -class Job: +T = TypeVar('T') + + +class Job(Generic[T]): """ - A Job is an operation that can be delegated to the camera event loop to perform + A Job is an operation that can be delegated to the camera event loop to perform. Such as capturing and returning an image. Most jobs only do a single thing, like copying out a numpy array, and so consist of a single function @@ -20,7 +24,17 @@ class Job: Picamera2.switch_mode_and_capture_array. """ - def __init__(self, functions, signal_function=None): + _functions: List[Callable[[], Tuple[bool, T]]] + _future: Future[T] + _signal_function: Optional[Callable[['Job[T]'], None]] + _result: Optional[T] + calls: int + + def __init__( + self, + functions: List[Callable[[], Tuple[bool, T]]], + signal_function: Optional[Callable[['Job[T]'], None]] = None, + ) -> None: self._functions = functions self._future = Future() self._future.set_running_or_notify_cancel() @@ -29,10 +43,11 @@ def __init__(self, functions, signal_function=None): # I wonder if there is any useful information we could collect, number # of frames it took for things to finish, maybe intermediate results... - self.calls = 0 + self.calls = 0 # Number of times the `execute` method has been called - def execute(self): - """Try to execute this Job. + def execute(self) -> bool: + """ + Try to execute this Job. It will return True if it finishes, or False if it needs to be tried again. """ @@ -61,16 +76,17 @@ def execute(self): return not self._functions - def signal(self): + def signal(self) -> None: """Signal that the job is finished.""" assert not self._functions, "Job not finished!" if not self._future.done(): + assert self._result is not None, "Job result is None" self._future.set_result(self._result) if self._signal_function: self._signal_function(self) - def get_result(self, timeout=None): + def get_result(self, timeout: Optional[float] = None) -> T: """This fetches the 'final result' of the job (being given by the return value of the last function executed). It will block @@ -78,10 +94,11 @@ def get_result(self, timeout=None): """ return self._future.result(timeout=timeout) - def cancel(self): - """Mark this job as cancelled, so that requesting the result raises a CancelledError. + def cancel(self) -> None: + """ + Mark this job as cancelled, so that requesting the result raises a CancelledError. User code should not call this because it won't unschedule the job, i.e. remove it from the job queue. Use Picamera2.cancel_all_and_flush() to cancel and clear all jobs. """ - self._future.set_exception(CancelledError) + self._future.set_exception(CancelledError()) diff --git a/picamera2/picamera2.py b/picamera2/picamera2.py index 69392553..dd197f92 100644 --- a/picamera2/picamera2.py +++ b/picamera2/picamera2.py @@ -13,7 +13,18 @@ import time from enum import Enum from functools import partial -from typing import Any, Dict, List, Tuple +from typing import ( + Any, + Dict, + List, + Tuple, + Optional, + overload, + Callable, + TypeVar, + Union, + Literal, +) import libcamera import numpy as np @@ -41,6 +52,7 @@ _log = logging.getLogger(__name__) +T = TypeVar('T') class Preview(Enum): """Enum that applications can pass to the start_preview method.""" @@ -640,8 +652,20 @@ def close(self) -> None: self.allocator = Allocator() _log.info('Camera closed successfully.') + @overload @staticmethod - def _make_initial_stream_config(stream_config: dict, updates: dict, ignore_list=[]) -> dict: + def _make_initial_stream_config( + stream_config: Dict, updates: None, ignore_list=[] + ) -> None: ... + + @overload + @staticmethod + def _make_initial_stream_config( + stream_config: Dict, updates: Dict, ignore_list=[] + ) -> Dict: ... + + @staticmethod + def _make_initial_stream_config(stream_config: dict, updates: Optional[dict], ignore_list=[]) -> Optional[dict]: """Take an initial stream_config and add any user updates. :param stream_config: Stream configuration @@ -1330,7 +1354,49 @@ def wait(self, job, timeout=None): """ return job.get_result(timeout=timeout) - def dispatch_functions(self, functions, wait, signal_function=None, immediate=False) -> None: + @overload + def dispatch_functions( + self, + functions: List[Callable[[], Tuple[bool, T]]], + wait: bool, + signal_function: Optional[Callable[['Job[T]'], None]] = None, + immediate: bool = False, + ) -> T: ... + + @overload + def dispatch_functions( + self, + functions: List[Callable[[], Tuple[bool, T]]], + wait: float, + signal_function: Optional[Callable[['Job[T]'], None]] = None, + immediate: bool = False, + ) -> T: ... + + @overload + def dispatch_functions( + self, + functions: List[Callable[[], Tuple[bool, T]]], + wait: None, + signal_function: Optional[Callable[['Job[T]'], None]] = None, + immediate: bool = False, + ) -> Optional[T]: ... + + @overload + def dispatch_functions( + self, + functions: List[Callable[[], Tuple[bool, T]]], + wait: bool = False, + signal_function: Optional[Callable[['Job[T]'], None]] = None, + immediate: bool = False, + ) -> Job[T]: ... + + def dispatch_functions( + self, + functions: List[Callable[[], Tuple[bool, Any]]], + wait: Union[bool, float, None] = False, + signal_function: Optional[Callable[['Job[Any]'], None]] = None, + immediate: bool = False, + ) -> Union[T, Job[Any], Optional[T]]: """The main thread should use this to dispatch a number of operations for the event loop to perform. When there are multiple items each will be processed on a separate @@ -1540,7 +1606,7 @@ def captured_sync_request(self, wait=None): finally: request.release() - def capture_metadata_(self): + def capture_metadata_(self) -> Tuple[bool, Optional[Dict[str, Any]]]: if not self.completed_requests: return (False, None) request = self.completed_requests.pop(0) @@ -1565,7 +1631,7 @@ def capture_buffer(self, name="main", wait=None, signal_function=None): """Make a 1d numpy array from the next frame in the named stream.""" return self.dispatch_functions([partial(self.capture_buffer_, name)], wait, signal_function) - def capture_buffers_and_metadata_(self, names) -> Tuple[List[np.ndarray], dict]: + def capture_buffers_and_metadata_(self, names) -> Tuple[bool, Optional[Tuple[List[np.ndarray], Dict[str, Any]]]]: if not self.completed_requests: return (False, None) request = self.completed_requests.pop(0) @@ -1615,7 +1681,7 @@ def capture_buffers_and_switch_back_(self, preview_config, names): partial(capture_buffers_and_switch_back_, self, preview_config, names)] return self.dispatch_functions(functions, wait, signal_function, immediate=True) - def capture_array_(self, name): + def capture_array_(self, name) -> Tuple[bool, Optional[np.ndarray]]: if not self.completed_requests: return (False, None) request = self.completed_requests.pop(0) @@ -1623,11 +1689,14 @@ def capture_array_(self, name): request.release() return (True, result) - def capture_array(self, name="main", wait=None, signal_function=None): + def capture_array(self, name="main", wait=None, signal_function=None) -> Optional[np.ndarray]: """Make a 2d image from the next frame in the named stream.""" return self.dispatch_functions([partial(self.capture_array_, name)], wait, signal_function) - def capture_arrays_and_metadata_(self, names) -> Tuple[List[np.ndarray], Dict[str, Any]]: + def capture_arrays_and_metadata_(self, names) -> Union[ + Tuple[Literal[False], None], + Tuple[Literal[True], Tuple[List[np.ndarray], Dict[str, Any]]], + ]: if not self.completed_requests: return (False, None) request = self.completed_requests.pop(0) @@ -1677,7 +1746,7 @@ def capture_arrays_and_switch_back_(self, preview_config, names): partial(capture_arrays_and_switch_back_, self, preview_config, names)] return self.dispatch_functions(functions, wait, signal_function, immediate=True) - def capture_image_(self, name: str) -> Image.Image: + def capture_image_(self, name: str) -> Tuple[bool, Optional[Image.Image]]: """Capture image :param name: Stream name @@ -1690,7 +1759,7 @@ def capture_image_(self, name: str) -> Image.Image: request.release() return (True, result) - def capture_image(self, name: str = "main", wait: bool = None, signal_function=None) -> Image.Image: + def capture_image(self, name: str = "main", wait: Optional[bool] = None, signal_function=None) -> Optional[Image.Image]: """Make a PIL image from the next frame in the named stream. :param name: Stream name, defaults to "main" @@ -1704,15 +1773,15 @@ def capture_image(self, name: str = "main", wait: bool = None, signal_function=N """ return self.dispatch_functions([partial(self.capture_image_, name)], wait, signal_function) - def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: bool = None, - signal_function=None, delay=0) -> Image.Image: + def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: Optional[bool] = None, + signal_function=None, delay=0) -> Optional[Image.Image]: """Switch the camera into a new (capture) mode, capture the image. Then return back to the initial camera mode. """ preview_config = self.camera_config - def capture_image_and_switch_back_(self, preview_config, name) -> Image.Image: + def capture_image_and_switch_back_(self, preview_config, name) -> Tuple[bool, Optional[Image.Image]]: done, result = self.capture_image_(name) if not done: return (False, None) diff --git a/picamera2/request.py b/picamera2/request.py index cb7f42f0..7e7f513f 100644 --- a/picamera2/request.py +++ b/picamera2/request.py @@ -1,3 +1,5 @@ +from __future__ import annotations +from typing import Any, Dict, Optional, cast, TYPE_CHECKING import io import logging import time @@ -15,40 +17,56 @@ from .controls import Controls from .sensor_format import SensorFormat from .utils import convert_from_libcamera_type +from typing_extensions import Buffer + + +if TYPE_CHECKING: + from picamera2.picamera2 import Picamera2 _log = logging.getLogger(__name__) class _MappedBuffer: - def __init__(self, request, stream, write=True): + def __init__( + self, request: "CompletedRequest", stream: str, write: bool = True + ) -> None: if isinstance(stream, str): - stream = request.stream_map[stream] + stream = cast(Dict[str, Any], request.stream_map)[stream] + assert request.request is not None self.__fb = request.request.buffers[stream] - self.__sync = request.picam2.allocator.sync(request.picam2.allocator, self.__fb, write) + self.__sync = request.picam2.allocator.sync( + request.picam2.allocator, self.__fb, write + ) - def __enter__(self): + def __enter__(self) -> Any: self.__mm = self.__sync.__enter__() return self.__mm - def __exit__(self, exc_type, exc_value, exc_traceback): + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None: self.__sync.__exit__(exc_type, exc_value, exc_traceback) class MappedArray: - def __init__(self, request, stream, reshape=True, write=True): - self.__request = request - self.__stream = stream - self.__buffer = _MappedBuffer(request, stream, write=write) - self.__array = None - self.__reshape = reshape - - def __enter__(self): + def __init__( + self, + request: "CompletedRequest", + stream: str, + reshape: bool = True, + write: bool = True, + ) -> None: + self.__request: "CompletedRequest" = request + self.__stream: str = stream + self.__buffer: _MappedBuffer = _MappedBuffer(request, stream, write=write) + self.__array: Optional[np.ndarray] = None + self.__reshape: bool = reshape + + def __enter__(self) -> "MappedArray": b = self.__buffer.__enter__() array = np.array(b, copy=False, dtype=np.uint8) if self.__reshape: if isinstance(self.__stream, str): - config = self.__request.config[self.__stream] + config = cast(Dict[str, Any], self.__request.config)[self.__stream] fmt = config["format"] w, h = config["size"] stride = config["stride"] @@ -87,40 +105,40 @@ def __enter__(self): self.__array = array return self - def __exit__(self, exc_type, exc_value, exc_traceback): + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None: if self.__array is not None: del self.__array self.__buffer.__exit__(exc_type, exc_value, exc_traceback) @property - def array(self): + def array(self) -> Optional[np.ndarray]: return self.__array class CompletedRequest: - def __init__(self, request, picam2): + def __init__(self, request: Any, picam2: "Picamera2") -> None: self.request = request - self.ref_count = 1 + self.ref_count: int = 1 self.lock = picam2.request_lock self.picam2 = picam2 - self.stop_count = picam2.stop_count - self.configure_count = picam2.configure_count - self.config = self.picam2.camera_config.copy() - self.stream_map = self.picam2.stream_map.copy() + self.stop_count: int = picam2.stop_count + self.configure_count: int = picam2.configure_count + self.config = cast(Dict[str, Any], self.picam2.camera_config).copy() + self.stream_map = cast(Dict[str, Any], self.picam2.stream_map).copy() with self.lock: self.syncs = [picam2.allocator.sync(self.picam2.allocator, buffer, False) for buffer in self.request.buffers.values()] self.picam2.allocator.acquire(self.request.buffers) [sync.__enter__() for sync in self.syncs] - def acquire(self): + def acquire(self) -> None: """Acquire a reference to this completed request, which stops it being recycled back to the camera system.""" with self.lock: if self.ref_count == 0: raise RuntimeError("CompletedRequest: acquiring lock with ref_count 0") self.ref_count += 1 - def release(self): + def release(self) -> None: """Release this completed frame back to the camera system (once its reference count reaches zero).""" with self.lock: self.ref_count -= 1 @@ -130,6 +148,7 @@ def release(self): # If the camera has been stopped since this request was returned then we # can't recycle it. if self.picam2.camera and self.stop_count == self.picam2.stop_count and self.picam2.started: + assert self.request is not None self.request.reuse() controls = self.picam2.controls.get_libcamera_controls() for id, value in controls.items(): @@ -137,45 +156,65 @@ def release(self): self.picam2.controls = Controls(self.picam2) self.picam2.camera.queue_request(self.request) [sync.__exit__() for sync in self.syncs] + assert self.request is not None self.picam2.allocator.release(self.request.buffers) self.request = None self.config = None self.stream_map = None - def make_buffer(self, name): - """Make a 1d numpy array from the named stream's buffer.""" - if self.stream_map.get(name, None) is None: - raise RuntimeError(f'Stream {name!r} is not defined') + def make_buffer(self, name: str) -> np.ndarray: + """Make a 1D numpy array from the named stream's buffer.""" + if cast(Dict[str, Any], self.stream_map).get(name) is None: + raise RuntimeError(f"Stream {name!r} is not defined") with _MappedBuffer(self, name, write=False) as b: return np.array(b, dtype=np.uint8) - def get_metadata(self): + def get_metadata(self) -> Dict[str, Any]: """Fetch the metadata corresponding to this completed request.""" metadata = {} + assert self.request is not None for k, v in self.request.metadata.items(): metadata[k.name] = convert_from_libcamera_type(v) return metadata - def make_array(self, name): - """Make a 2d numpy array from the named stream's buffer.""" - return self.picam2.helpers.make_array(self.make_buffer(name), self.config[name]) + def make_array(self, name: str) -> np.ndarray: + """Make a 2D numpy array from the named stream's buffer.""" + return self.picam2.helpers.make_array(self.make_buffer(name), cast(Dict[str, Any], self.config)[name]) - def make_image(self, name, width=None, height=None): + def make_image( + self, name: str, width: Optional[int] = None, height: Optional[int] = None + ) -> Image.Image: """Make a PIL image from the named stream's buffer.""" - return self.picam2.helpers.make_image(self.make_buffer(name), self.config[name], width, height) - - def save(self, name, file_output, format=None, exif_data=None): - """Save a JPEG or PNG image of the named stream's buffer. + return self.picam2.helpers.make_image( + self.make_buffer(name), cast(Dict[str, Any], self.config), width, height + ) + + def save( + self, + name: str, + file_output: Any, + format: Optional[str] = None, + exif_data: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Save a JPEG or PNG image of the named stream's buffer. exif_data - dictionary containing user defined exif data (based on `piexif`). This will overwrite existing exif information generated by picamera2. """ - return self.picam2.helpers.save(self.make_image(name), self.get_metadata(), file_output, - format, exif_data) - - def save_dng(self, file_output, name="raw"): + return self.picam2.helpers.save( + self.make_image(name), + self.get_metadata(), + file_output, + format, + exif_data, + ) + + def save_dng(self, file_output: Any, name: str = "raw") -> None: """Save a DNG RAW image of the raw stream's buffer.""" - return self.picam2.helpers.save_dng(self.make_buffer(name), self.get_metadata(), self.config[name], file_output) + return self.picam2.helpers.save_dng( + self.make_buffer(name), self.get_metadata(), cast(Dict[str, Any], self.config)[name], file_output + ) class Helpers: @@ -184,11 +223,11 @@ class Helpers: In such a way that it can be usefully accessed even without a CompletedRequest object. """ - def __init__(self, picam2): + def __init__(self, picam2: "Picamera2"): self.picam2 = picam2 - def make_array(self, buffer, config): - """Make a 2d numpy array from the named stream's buffer.""" + def make_array(self, buffer: np.ndarray, config: Dict[str, Any]) -> np.ndarray: + """Make a 2D numpy array from the named stream's buffer.""" array = buffer fmt = config["format"] w, h = config["size"] @@ -225,21 +264,32 @@ def make_array(self, buffer, config): # cv2.cvtColor(image, cv2.COLOR_YUV2BGR_YUYV) will convert directly to RGB. image = array.reshape(h, stride // 2, 2) elif fmt == "MJPEG": - image = np.array(Image.open(io.BytesIO(array))) + image = np.array(Image.open(io.BytesIO(cast(Buffer, array)))) elif formats.is_raw(fmt): image = array.reshape((h, stride)) else: raise RuntimeError("Format " + fmt + " not supported") return image - def make_image(self, buffer, config, width=None, height=None): + def make_image( + self, + buffer: np.ndarray, + config: Dict[str, Any], + width: Optional[int] = None, + height: Optional[int] = None + ) -> Image.Image: """Make a PIL image from the named stream's buffer.""" fmt = config["format"] if fmt == "MJPEG": - return Image.open(io.BytesIO(buffer)) + return Image.open(io.BytesIO(cast(Buffer, buffer))) else: rgb = self.make_array(buffer, config) - mode_lookup = {"RGB888": "BGR", "BGR888": "RGB", "XBGR8888": "RGBX", "XRGB8888": "BGRX"} + mode_lookup = { + "RGB888": "BGR", + "BGR888": "RGB", + "XBGR8888": "RGBX", + "XRGB8888": "BGRX", + } if fmt not in mode_lookup: raise RuntimeError(f"Stream format {fmt} not supported for PIL images") mode = mode_lookup[fmt] @@ -280,14 +330,19 @@ def save(self, img, metadata, file_output, format=None, exif_data=None): # Make up some extra EXIF data. if "AnalogueGain" in metadata and "DigitalGain" in metadata: datetime_now = datetime.now().strftime("%Y:%m:%d %H:%M:%S") - zero_ifd = {piexif.ImageIFD.Make: "Raspberry Pi", - piexif.ImageIFD.Model: self.picam2.camera.id, - piexif.ImageIFD.Software: "Picamera2", - piexif.ImageIFD.DateTime: datetime_now} + assert self.picam2.camera is not None + zero_ifd = { + piexif.ImageIFD.Make: "Raspberry Pi", + piexif.ImageIFD.Model: self.picam2.camera.id, + piexif.ImageIFD.Software: "Picamera2", + piexif.ImageIFD.DateTime: datetime_now, + } total_gain = metadata["AnalogueGain"] * metadata["DigitalGain"] - exif_ifd = {piexif.ExifIFD.DateTimeOriginal: datetime_now, - piexif.ExifIFD.ExposureTime: (metadata["ExposureTime"], 1000000), - piexif.ExifIFD.ISOSpeedRatings: int(total_gain * 100)} + exif_ifd = { + piexif.ExifIFD.DateTimeOriginal: datetime_now, + piexif.ExifIFD.ExposureTime: (metadata["ExposureTime"], 1000000), + piexif.ExifIFD.ISOSpeedRatings: int(total_gain * 100), + } exif_dict = {"0th": zero_ifd, "Exif": exif_ifd} # merge user provided exif data, overwriting the defaults exif_dict = exif_dict | exif_data @@ -303,7 +358,13 @@ def save(self, img, metadata, file_output, format=None, exif_data=None): _log.info(f"Saved {self} to file {file_output}.") _log.info(f"Time taken for encode: {(end_time-start_time)*1000} ms.") - def save_dng(self, buffer, metadata, config, file_output): + def save_dng( + self, + buffer: np.ndarray, + metadata: Dict[str, Any], + config: Dict[str, Any], + file_output: Any + ) -> None: """Save a DNG RAW image of the raw stream's buffer.""" start_time = time.monotonic() raw = self.make_array(buffer, config) @@ -334,7 +395,7 @@ def save_dng(self, buffer, metadata, config, file_output): _log.info(f"Saved {self} to file {file_output}.") _log.info(f"Time taken for encode: {(end_time-start_time)*1000} ms.") - def decompress(self, array): + def decompress(self, array: np.ndarray): """Decompress an image buffer that has been compressed with a PiSP compression format.""" # These are the standard configurations used in the drivers. offset = 2048 diff --git a/pyrightconfig.json b/pyrightconfig.json index cb48eb7e..13c36134 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -3,7 +3,7 @@ "exclude": ["tests"], "reportMissingImports": false, "reportMissingTypeStubs": false, - "reportUnknownMemberType": "warning", + "reportUnknownMemberType": false, "reportOptionalMemberAccess": "warning", "reportGeneralTypeIssues": "warning", "reportAttributeAccessIssue": "warning", From dcc7aa8df3ac7738314ae3fd9e474f4390731b5b Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Thu, 6 Feb 2025 22:08:27 +0200 Subject: [PATCH 2/8] refactor: use cast for type safety in job result handling This will preserve original behavior Signed-off-by: KarimAziev --- picamera2/job.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/picamera2/job.py b/picamera2/job.py index ffa16d04..d7ec4ecb 100644 --- a/picamera2/job.py +++ b/picamera2/job.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Optional, Tuple, TypeVar, Generic +from typing import Callable, List, Optional, Tuple, TypeVar, Generic, cast from concurrent.futures import Future, CancelledError @@ -81,8 +81,7 @@ def signal(self) -> None: assert not self._functions, "Job not finished!" if not self._future.done(): - assert self._result is not None, "Job result is None" - self._future.set_result(self._result) + self._future.set_result(cast(T, self._result)) if self._signal_function: self._signal_function(self) From ac0582d72ee1699da82a985abf35a883dd660691 Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Thu, 6 Feb 2025 22:20:44 +0200 Subject: [PATCH 3/8] refactor: improve some type annotations Signed-off-by: KarimAziev --- picamera2/picamera2.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/picamera2/picamera2.py b/picamera2/picamera2.py index dd197f92..c4574382 100644 --- a/picamera2/picamera2.py +++ b/picamera2/picamera2.py @@ -24,6 +24,7 @@ TypeVar, Union, Literal, + cast, ) import libcamera @@ -315,7 +316,7 @@ def _reset_flags(self) -> None: self.stop_count = 0 self.configure_count = 0 self.frames = 0 - self._job_list = [] + self._job_list: List[Job] = [] self.options = {} self._encoders = set() self.pre_callback = None @@ -1039,7 +1040,7 @@ def _update_camera_config(self, camera_config, libcamera_config) -> None: sensor_config['output_size'] = utils.convert_from_libcamera_type(libcamera_config.sensor_config.output_size) camera_config['sensor'] = sensor_config - def configure_(self, camera_config="preview") -> None: + def configure_(self, camera_config: Union[CameraConfiguration, str, Dict, None] = "preview") -> None: """Configure the camera system with the given configuration. :param camera_config: Configuration, defaults to the 'preview' configuration @@ -1222,7 +1223,7 @@ def cancel_all_and_flush(self) -> None: job.cancel() self._job_list = [] - def stop_(self, request=None) -> None: + def stop_(self, request: Optional[CompletedRequest] = None) -> Tuple[Literal[True], None]: """Stop the camera. Only call this function directly from within the camera event @@ -1458,11 +1459,11 @@ def drop_frames(self, num_frames, wait=None, signal_function=None): functions = [partial(self.set_frame_drops_, num_frames), self.drop_frames_] return self.dispatch_functions(functions, wait, signal_function, immediate=True) - def capture_file_(self, file_output, name: str, format=None, exif_data=None) -> dict: + def capture_file_(self, file_output: Any, name: str, format=None, exif_data=None) -> Tuple[bool, Optional[Dict]]: if not self.completed_requests: return (False, None) request = self.completed_requests.pop(0) - if name == "raw" and formats.is_raw(self.camera_config["raw"]["format"]): + if name == "raw" and formats.is_raw(cast(Dict, self.camera_config)["raw"]["format"]): request.save_dng(file_output) else: request.save(name, file_output, format=format, exif_data=exif_data) @@ -1478,7 +1479,7 @@ def capture_file( format=None, wait=None, signal_function=None, - exif_data=None) -> dict: + exif_data=None) -> Optional[dict]: """Capture an image to a file in the current camera mode. Return the metadata for the frame captured. @@ -1490,7 +1491,7 @@ def capture_file( exif_data=exif_data)] return self.dispatch_functions(functions, wait, signal_function) - def switch_mode_(self, camera_config): + def switch_mode_(self, camera_config: Union[str, Dict, None]) -> Tuple[bool, Union[Dict, str, None]]: self.stop_() self.configure_(camera_config) self.start_() @@ -1759,7 +1760,8 @@ def capture_image_(self, name: str) -> Tuple[bool, Optional[Image.Image]]: request.release() return (True, result) - def capture_image(self, name: str = "main", wait: Optional[bool] = None, signal_function=None) -> Optional[Image.Image]: + def capture_image(self, name: str = "main", wait: Optional[Union[bool, float]] = None, + signal_function=None) -> Optional[Image.Image]: """Make a PIL image from the next frame in the named stream. :param name: Stream name, defaults to "main" @@ -1773,7 +1775,7 @@ def capture_image(self, name: str = "main", wait: Optional[bool] = None, signal_ """ return self.dispatch_functions([partial(self.capture_image_, name)], wait, signal_function) - def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: Optional[bool] = None, + def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: Optional[Union[bool, float]] = None, signal_function=None, delay=0) -> Optional[Image.Image]: """Switch the camera into a new (capture) mode, capture the image. From d1189801e87c37ff0fc34b59cccd8fb6c4620938 Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Fri, 7 Feb 2025 10:48:29 +0200 Subject: [PATCH 4/8] refactor: Improve typings for global_camera_info Signed-off-by: KarimAziev --- picamera2/picamera2.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/picamera2/picamera2.py b/picamera2/picamera2.py index c4574382..0666ee64 100644 --- a/picamera2/picamera2.py +++ b/picamera2/picamera2.py @@ -25,6 +25,7 @@ Union, Literal, cast, + TypedDict, ) import libcamera @@ -63,6 +64,24 @@ class Preview(Enum): QT = 2 QTGL = 3 +class GlobalCameraInfo(TypedDict): + """ + TypedDict for camera information fields. + + Fields: + Model: The model name of the camera, as advertised by the camera driver. + Location: A number reporting how the camera is mounted, as reported by libcamera. + Rotation: How the camera is rotated for normal operation, as reported by libcamera. + Id: An identifier string for the camera, indicating how the camera is connected. + Num: A camera index. + """ + Model: str + Location: int + Rotation: int + Id: str + Num: int + + class CameraManager: def __init__(self): @@ -222,17 +241,18 @@ def find_tuning_algo(tuning: dict, name: str) -> dict: return next(algo for algo in tuning["algorithms"] if name in algo)[name] @staticmethod - def global_camera_info() -> list: - """Return Id string and Model name for all attached cameras, one dict per camera. + def global_camera_info() -> List[GlobalCameraInfo]: + """ + Return Id string and Model name for all attached cameras, one dict per camera. Ordered correctly by camera number. Also return the location and rotation of the camera when known, as these may help distinguish which is which. """ - def describe_camera(cam, num): + def describe_camera(cam, num: int): info = {k.name: v for k, v in cam.properties.items() if k.name in ("Model", "Location", "Rotation")} info["Id"] = cam.id info["Num"] = num - return info + return cast(GlobalCameraInfo, info) cameras = [describe_camera(cam, i) for i, cam in enumerate(Picamera2._cm.cms.cameras)] # Sort alphabetically so they are deterministic, but send USB cams to the back of the class. return sorted(cameras, key=lambda cam: ("/usb" not in cam['Id'], cam['Id']), reverse=True) @@ -1902,7 +1922,7 @@ def stop_recording(self) -> None: self.stop() self.stop_encoder() - def set_overlay(self, overlay) -> None: + def set_overlay(self, overlay: Optional[np.ndarray]) -> None: """Display an overlay on the camera image. The overlay may be either None, in which case any overlay is removed, From 234b8b986ebfc9bbae36ad992f0f5eacd8229c36 Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Fri, 7 Feb 2025 11:52:17 +0200 Subject: [PATCH 5/8] style: don't format code with black, keep original code style Signed-off-by: KarimAziev --- picamera2/request.py | 99 +++++++++++++------------------------------- 1 file changed, 28 insertions(+), 71 deletions(-) diff --git a/picamera2/request.py b/picamera2/request.py index 7e7f513f..901c3838 100644 --- a/picamera2/request.py +++ b/picamera2/request.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Any, Dict, Optional, cast, TYPE_CHECKING +from typing import Any, Dict, Optional, cast, TYPE_CHECKING, Union import io import logging import time @@ -27,16 +27,12 @@ class _MappedBuffer: - def __init__( - self, request: "CompletedRequest", stream: str, write: bool = True - ) -> None: + def __init__(self, request: "CompletedRequest", stream: str, write: bool = True) -> None: if isinstance(stream, str): stream = cast(Dict[str, Any], request.stream_map)[stream] assert request.request is not None self.__fb = request.request.buffers[stream] - self.__sync = request.picam2.allocator.sync( - request.picam2.allocator, self.__fb, write - ) + self.__sync = request.picam2.allocator.sync(request.picam2.allocator, self.__fb, write) def __enter__(self) -> Any: self.__mm = self.__sync.__enter__() @@ -47,13 +43,7 @@ def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None: class MappedArray: - def __init__( - self, - request: "CompletedRequest", - stream: str, - reshape: bool = True, - write: bool = True, - ) -> None: + def __init__(self, request: "CompletedRequest", stream: str, reshape: bool = True, write: bool = True) -> None: self.__request: "CompletedRequest" = request self.__stream: str = stream self.__buffer: _MappedBuffer = _MappedBuffer(request, stream, write=write) @@ -165,7 +155,7 @@ def release(self) -> None: def make_buffer(self, name: str) -> np.ndarray: """Make a 1D numpy array from the named stream's buffer.""" if cast(Dict[str, Any], self.stream_map).get(name) is None: - raise RuntimeError(f"Stream {name!r} is not defined") + raise RuntimeError(f'Stream {name!r} is not defined') with _MappedBuffer(self, name, write=False) as b: return np.array(b, dtype=np.uint8) @@ -181,40 +171,25 @@ def make_array(self, name: str) -> np.ndarray: """Make a 2D numpy array from the named stream's buffer.""" return self.picam2.helpers.make_array(self.make_buffer(name), cast(Dict[str, Any], self.config)[name]) - def make_image( - self, name: str, width: Optional[int] = None, height: Optional[int] = None - ) -> Image.Image: + def make_image(self, name: str, width: Optional[int] = None, height: Optional[int] = None) -> Image.Image: """Make a PIL image from the named stream's buffer.""" - return self.picam2.helpers.make_image( - self.make_buffer(name), cast(Dict[str, Any], self.config), width, height - ) - - def save( - self, - name: str, - file_output: Any, - format: Optional[str] = None, - exif_data: Optional[Dict[str, Any]] = None, - ) -> None: + return self.picam2.helpers.make_image(self.make_buffer(name), cast(Dict[str, Any], self.config), width, height) + + def save(self, name: str, file_output: Any, format: Optional[str] = None, + exif_data: Optional[Dict[str, Any]] = None) -> None: """ Save a JPEG or PNG image of the named stream's buffer. exif_data - dictionary containing user defined exif data (based on `piexif`). This will overwrite existing exif information generated by picamera2. """ - return self.picam2.helpers.save( - self.make_image(name), - self.get_metadata(), - file_output, - format, - exif_data, - ) + return self.picam2.helpers.save(self.make_image(name), self.get_metadata(), file_output, + format, exif_data) def save_dng(self, file_output: Any, name: str = "raw") -> None: """Save a DNG RAW image of the raw stream's buffer.""" - return self.picam2.helpers.save_dng( - self.make_buffer(name), self.get_metadata(), cast(Dict[str, Any], self.config)[name], file_output - ) + return self.picam2.helpers.save_dng(self.make_buffer(name), self.get_metadata(), + cast(Dict[str, Any], self.config)[name], file_output) class Helpers: @@ -271,25 +246,16 @@ def make_array(self, buffer: np.ndarray, config: Dict[str, Any]) -> np.ndarray: raise RuntimeError("Format " + fmt + " not supported") return image - def make_image( - self, - buffer: np.ndarray, - config: Dict[str, Any], - width: Optional[int] = None, - height: Optional[int] = None - ) -> Image.Image: + + def make_image(self, buffer: np.ndarray, config: Dict[str, Any], width: Optional[int] = None, + height: Optional[int] = None) -> Image.Image: """Make a PIL image from the named stream's buffer.""" fmt = config["format"] if fmt == "MJPEG": return Image.open(io.BytesIO(cast(Buffer, buffer))) else: rgb = self.make_array(buffer, config) - mode_lookup = { - "RGB888": "BGR", - "BGR888": "RGB", - "XBGR8888": "RGBX", - "XRGB8888": "BGRX", - } + mode_lookup = {"RGB888": "BGR", "BGR888": "RGB", "XBGR8888": "RGBX", "XRGB8888": "BGRX"} if fmt not in mode_lookup: raise RuntimeError(f"Stream format {fmt} not supported for PIL images") mode = mode_lookup[fmt] @@ -303,7 +269,8 @@ def make_image( pil_img = pil_img.resize((width, height)) return pil_img - def save(self, img, metadata, file_output, format=None, exif_data=None): + def save(self, img: Image.Image, metadata: Dict[str, Any], file_output: Union[str, Path], format: Optional[str] = None, + exif_data: Optional[Dict] = None) -> None: """Save a JPEG or PNG image of the named stream's buffer. exif_data - dictionary containing user defined exif data (based on `piexif`). This will @@ -331,18 +298,14 @@ def save(self, img, metadata, file_output, format=None, exif_data=None): if "AnalogueGain" in metadata and "DigitalGain" in metadata: datetime_now = datetime.now().strftime("%Y:%m:%d %H:%M:%S") assert self.picam2.camera is not None - zero_ifd = { - piexif.ImageIFD.Make: "Raspberry Pi", - piexif.ImageIFD.Model: self.picam2.camera.id, - piexif.ImageIFD.Software: "Picamera2", - piexif.ImageIFD.DateTime: datetime_now, - } + zero_ifd = {piexif.ImageIFD.Make: "Raspberry Pi", + piexif.ImageIFD.Model: self.picam2.camera.id, + piexif.ImageIFD.Software: "Picamera2", + piexif.ImageIFD.DateTime: datetime_now} total_gain = metadata["AnalogueGain"] * metadata["DigitalGain"] - exif_ifd = { - piexif.ExifIFD.DateTimeOriginal: datetime_now, - piexif.ExifIFD.ExposureTime: (metadata["ExposureTime"], 1000000), - piexif.ExifIFD.ISOSpeedRatings: int(total_gain * 100), - } + exif_ifd = {piexif.ExifIFD.DateTimeOriginal: datetime_now, + piexif.ExifIFD.ExposureTime: (metadata["ExposureTime"], 1000000), + piexif.ExifIFD.ISOSpeedRatings: int(total_gain * 100)} exif_dict = {"0th": zero_ifd, "Exif": exif_ifd} # merge user provided exif data, overwriting the defaults exif_dict = exif_dict | exif_data @@ -358,13 +321,7 @@ def save(self, img, metadata, file_output, format=None, exif_data=None): _log.info(f"Saved {self} to file {file_output}.") _log.info(f"Time taken for encode: {(end_time-start_time)*1000} ms.") - def save_dng( - self, - buffer: np.ndarray, - metadata: Dict[str, Any], - config: Dict[str, Any], - file_output: Any - ) -> None: + def save_dng(self, buffer: np.ndarray, metadata: Dict[str, Any], config: Dict[str, Any], file_output: Any) -> None: """Save a DNG RAW image of the raw stream's buffer.""" start_time = time.monotonic() raw = self.make_array(buffer, config) From 7b9e9a6b896ccb9f1aa060c7a1e257a28e6754ac Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Fri, 7 Feb 2025 11:57:59 +0200 Subject: [PATCH 6/8] fix: correct image buffer handling in make_image method Signed-off-by: KarimAziev --- picamera2/request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/picamera2/request.py b/picamera2/request.py index 901c3838..5ec7f31a 100644 --- a/picamera2/request.py +++ b/picamera2/request.py @@ -173,7 +173,7 @@ def make_array(self, name: str) -> np.ndarray: def make_image(self, name: str, width: Optional[int] = None, height: Optional[int] = None) -> Image.Image: """Make a PIL image from the named stream's buffer.""" - return self.picam2.helpers.make_image(self.make_buffer(name), cast(Dict[str, Any], self.config), width, height) + return self.picam2.helpers.make_image(self.make_buffer(name), cast(Dict[str, Any], self.config)[name], width, height) def save(self, name: str, file_output: Any, format: Optional[str] = None, exif_data: Optional[Dict[str, Any]] = None) -> None: From 5dab3d973cf659c3b3a6988cf20aebbf40cabefe Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Fri, 7 Feb 2025 12:40:57 +0200 Subject: [PATCH 7/8] style: fix flake8 issues Signed-off-by: KarimAziev --- picamera2/job.py | 5 ++--- picamera2/picamera2.py | 42 ++++++++++++++++++++---------------------- picamera2/request.py | 7 +++---- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/picamera2/job.py b/picamera2/job.py index d7ec4ecb..8f10743d 100644 --- a/picamera2/job.py +++ b/picamera2/job.py @@ -1,6 +1,5 @@ -from typing import Callable, List, Optional, Tuple, TypeVar, Generic, cast -from concurrent.futures import Future, CancelledError - +from concurrent.futures import CancelledError, Future +from typing import Callable, Generic, List, Optional, Tuple, TypeVar, cast T = TypeVar('T') diff --git a/picamera2/picamera2.py b/picamera2/picamera2.py index 0666ee64..533c81af 100644 --- a/picamera2/picamera2.py +++ b/picamera2/picamera2.py @@ -1,6 +1,8 @@ #!/usr/bin/python3 """picamera2 main class""" +from __future__ import annotations + import atexit import contextlib import json @@ -13,20 +15,8 @@ import time from enum import Enum from functools import partial -from typing import ( - Any, - Dict, - List, - Tuple, - Optional, - overload, - Callable, - TypeVar, - Union, - Literal, - cast, - TypedDict, -) +from typing import (Any, Callable, Dict, List, Literal, Optional, Tuple, + TypedDict, TypeVar, Union, cast, overload) import libcamera import numpy as np @@ -56,6 +46,7 @@ T = TypeVar('T') + class Preview(Enum): """Enum that applications can pass to the start_preview method.""" @@ -64,6 +55,7 @@ class Preview(Enum): QT = 2 QTGL = 3 + class GlobalCameraInfo(TypedDict): """ TypedDict for camera information fields. @@ -75,6 +67,7 @@ class GlobalCameraInfo(TypedDict): Id: An identifier string for the camera, indicating how the camera is connected. Num: A camera index. """ + Model: str Location: int Rotation: int @@ -82,7 +75,6 @@ class GlobalCameraInfo(TypedDict): Num: int - class CameraManager: def __init__(self): self.running = False @@ -677,16 +669,18 @@ def close(self) -> None: @staticmethod def _make_initial_stream_config( stream_config: Dict, updates: None, ignore_list=[] - ) -> None: ... + ) -> None: + ... @overload @staticmethod def _make_initial_stream_config( stream_config: Dict, updates: Dict, ignore_list=[] - ) -> Dict: ... + ) -> Dict: + ... @staticmethod - def _make_initial_stream_config(stream_config: dict, updates: Optional[dict], ignore_list=[]) -> Optional[dict]: + def _make_initial_stream_config(stream_config: dict, updates: Optional[dict], ignore_list=[]) -> Optional[dict]: """Take an initial stream_config and add any user updates. :param stream_config: Stream configuration @@ -1382,7 +1376,8 @@ def dispatch_functions( wait: bool, signal_function: Optional[Callable[['Job[T]'], None]] = None, immediate: bool = False, - ) -> T: ... + ) -> T: + ... @overload def dispatch_functions( @@ -1391,7 +1386,8 @@ def dispatch_functions( wait: float, signal_function: Optional[Callable[['Job[T]'], None]] = None, immediate: bool = False, - ) -> T: ... + ) -> T: + ... @overload def dispatch_functions( @@ -1400,7 +1396,8 @@ def dispatch_functions( wait: None, signal_function: Optional[Callable[['Job[T]'], None]] = None, immediate: bool = False, - ) -> Optional[T]: ... + ) -> Optional[T]: + ... @overload def dispatch_functions( @@ -1409,7 +1406,8 @@ def dispatch_functions( wait: bool = False, signal_function: Optional[Callable[['Job[T]'], None]] = None, immediate: bool = False, - ) -> Job[T]: ... + ) -> Job[T]: + ... def dispatch_functions( self, diff --git a/picamera2/request.py b/picamera2/request.py index 5ec7f31a..1ab92921 100644 --- a/picamera2/request.py +++ b/picamera2/request.py @@ -1,24 +1,24 @@ from __future__ import annotations -from typing import Any, Dict, Optional, cast, TYPE_CHECKING, Union + import io import logging import time from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Optional, Union, cast import numpy as np import piexif from pidng.camdefs import Picamera2Camera from pidng.core import PICAM2DNG from PIL import Image +from typing_extensions import Buffer import picamera2.formats as formats from .controls import Controls from .sensor_format import SensorFormat from .utils import convert_from_libcamera_type -from typing_extensions import Buffer - if TYPE_CHECKING: from picamera2.picamera2 import Picamera2 @@ -246,7 +246,6 @@ def make_array(self, buffer: np.ndarray, config: Dict[str, Any]) -> np.ndarray: raise RuntimeError("Format " + fmt + " not supported") return image - def make_image(self, buffer: np.ndarray, config: Dict[str, Any], width: Optional[int] = None, height: Optional[int] = None) -> Image.Image: """Make a PIL image from the named stream's buffer.""" From 00065d474964b2128a7681b78ff42032f3aa0208 Mon Sep 17 00:00:00 2001 From: KarimAziev Date: Tue, 11 Feb 2025 18:33:00 +0200 Subject: [PATCH 8/8] refactor: remove typing_extensions.Buffer Signed-off-by: KarimAziev --- picamera2/request.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/picamera2/request.py b/picamera2/request.py index 1ab92921..70a58d35 100644 --- a/picamera2/request.py +++ b/picamera2/request.py @@ -12,7 +12,6 @@ from pidng.camdefs import Picamera2Camera from pidng.core import PICAM2DNG from PIL import Image -from typing_extensions import Buffer import picamera2.formats as formats @@ -239,7 +238,7 @@ def make_array(self, buffer: np.ndarray, config: Dict[str, Any]) -> np.ndarray: # cv2.cvtColor(image, cv2.COLOR_YUV2BGR_YUYV) will convert directly to RGB. image = array.reshape(h, stride // 2, 2) elif fmt == "MJPEG": - image = np.array(Image.open(io.BytesIO(cast(Buffer, array)))) + image = np.array(Image.open(io.BytesIO(array))) # type: ignore elif formats.is_raw(fmt): image = array.reshape((h, stride)) else: @@ -251,7 +250,7 @@ def make_image(self, buffer: np.ndarray, config: Dict[str, Any], width: Optional """Make a PIL image from the named stream's buffer.""" fmt = config["format"] if fmt == "MJPEG": - return Image.open(io.BytesIO(cast(Buffer, buffer))) + return Image.open(io.BytesIO(buffer)) # type: ignore else: rgb = self.make_array(buffer, config) mode_lookup = {"RGB888": "BGR", "BGR888": "RGB", "XBGR8888": "RGBX", "XRGB8888": "BGRX"} @@ -265,7 +264,7 @@ def make_image(self, buffer: np.ndarray, config: Dict[str, Any], width: Optional height = rgb.shape[0] if width != rgb.shape[1] or height != rgb.shape[0]: # This will be slow. Consider requesting camera images of this size in the first place! - pil_img = pil_img.resize((width, height)) + pil_img = pil_img.resize((width, height)) # type: ignore return pil_img def save(self, img: Image.Image, metadata: Dict[str, Any], file_output: Union[str, Path], format: Optional[str] = None,