Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add type annotations and improve type safety #1205

Merged
merged 8 commits into from
Feb 11, 2025
39 changes: 27 additions & 12 deletions picamera2/job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from concurrent.futures import CancelledError, Future
from typing import Callable, Generic, List, Optional, Tuple, TypeVar, cast

T = TypeVar('T')

class Job:

class Job(Generic[T]):
"""
A Job is an operation that can be delegated to the camera event loop to perform
A Job is an operation that can be delegated to the camera event loop to perform.

Such as capturing and returning an image. Most jobs only do a single
thing, like copying out a numpy array, and so consist of a single function
Expand All @@ -20,7 +23,17 @@ class Job:
Picamera2.switch_mode_and_capture_array.
"""

def __init__(self, functions, signal_function=None):
_functions: List[Callable[[], Tuple[bool, T]]]
_future: Future[T]
_signal_function: Optional[Callable[['Job[T]'], None]]
_result: Optional[T]
calls: int

def __init__(
self,
functions: List[Callable[[], Tuple[bool, T]]],
signal_function: Optional[Callable[['Job[T]'], None]] = None,
) -> None:
self._functions = functions
self._future = Future()
self._future.set_running_or_notify_cancel()
Expand All @@ -29,10 +42,11 @@ def __init__(self, functions, signal_function=None):

# I wonder if there is any useful information we could collect, number
# of frames it took for things to finish, maybe intermediate results...
self.calls = 0
self.calls = 0 # Number of times the `execute` method has been called

def execute(self):
"""Try to execute this Job.
def execute(self) -> bool:
"""
Try to execute this Job.

It will return True if it finishes, or False if it needs to be tried again.
"""
Expand Down Expand Up @@ -61,27 +75,28 @@ def execute(self):

return not self._functions

def signal(self):
def signal(self) -> None:
"""Signal that the job is finished."""
assert not self._functions, "Job not finished!"

if not self._future.done():
self._future.set_result(self._result)
self._future.set_result(cast(T, self._result))
if self._signal_function:
self._signal_function(self)

def get_result(self, timeout=None):
def get_result(self, timeout: Optional[float] = None) -> T:
"""This fetches the 'final result' of the job

(being given by the return value of the last function executed). It will block
if necessary for the job to complete.
"""
return self._future.result(timeout=timeout)

def cancel(self):
"""Mark this job as cancelled, so that requesting the result raises a CancelledError.
def cancel(self) -> None:
"""
Mark this job as cancelled, so that requesting the result raises a CancelledError.

User code should not call this because it won't unschedule the job, i.e. remove it
from the job queue. Use Picamera2.cancel_all_and_flush() to cancel and clear all jobs.
"""
self._future.set_exception(CancelledError)
self._future.set_exception(CancelledError())
139 changes: 114 additions & 25 deletions picamera2/picamera2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/python3
"""picamera2 main class"""

from __future__ import annotations

import atexit
import contextlib
import json
Expand All @@ -13,7 +15,8 @@
import time
from enum import Enum
from functools import partial
from typing import Any, Dict, List, Tuple
from typing import (Any, Callable, Dict, List, Literal, Optional, Tuple,
TypedDict, TypeVar, Union, cast, overload)

import libcamera
import numpy as np
Expand Down Expand Up @@ -41,6 +44,8 @@

_log = logging.getLogger(__name__)

T = TypeVar('T')


class Preview(Enum):
"""Enum that applications can pass to the start_preview method."""
Expand All @@ -51,6 +56,25 @@ class Preview(Enum):
QTGL = 3


class GlobalCameraInfo(TypedDict):
"""
TypedDict for camera information fields.

Fields:
Model: The model name of the camera, as advertised by the camera driver.
Location: A number reporting how the camera is mounted, as reported by libcamera.
Rotation: How the camera is rotated for normal operation, as reported by libcamera.
Id: An identifier string for the camera, indicating how the camera is connected.
Num: A camera index.
"""

Model: str
Location: int
Rotation: int
Id: str
Num: int


class CameraManager:
def __init__(self):
self.running = False
Expand Down Expand Up @@ -209,17 +233,18 @@ def find_tuning_algo(tuning: dict, name: str) -> dict:
return next(algo for algo in tuning["algorithms"] if name in algo)[name]

@staticmethod
def global_camera_info() -> list:
"""Return Id string and Model name for all attached cameras, one dict per camera.
def global_camera_info() -> List[GlobalCameraInfo]:
"""
Return Id string and Model name for all attached cameras, one dict per camera.

Ordered correctly by camera number. Also return the location and rotation
of the camera when known, as these may help distinguish which is which.
"""
def describe_camera(cam, num):
def describe_camera(cam, num: int):
info = {k.name: v for k, v in cam.properties.items() if k.name in ("Model", "Location", "Rotation")}
info["Id"] = cam.id
info["Num"] = num
return info
return cast(GlobalCameraInfo, info)
cameras = [describe_camera(cam, i) for i, cam in enumerate(Picamera2._cm.cms.cameras)]
# Sort alphabetically so they are deterministic, but send USB cams to the back of the class.
return sorted(cameras, key=lambda cam: ("/usb" not in cam['Id'], cam['Id']), reverse=True)
Expand Down Expand Up @@ -303,7 +328,7 @@ def _reset_flags(self) -> None:
self.stop_count = 0
self.configure_count = 0
self.frames = 0
self._job_list = []
self._job_list: List[Job] = []
self.options = {}
self._encoders = set()
self.pre_callback = None
Expand Down Expand Up @@ -640,8 +665,22 @@ def close(self) -> None:
self.allocator = Allocator()
_log.info('Camera closed successfully.')

@overload
@staticmethod
def _make_initial_stream_config(
stream_config: Dict, updates: None, ignore_list=[]
) -> None:
...

@overload
@staticmethod
def _make_initial_stream_config(
stream_config: Dict, updates: Dict, ignore_list=[]
) -> Dict:
...

@staticmethod
def _make_initial_stream_config(stream_config: dict, updates: dict, ignore_list=[]) -> dict:
def _make_initial_stream_config(stream_config: dict, updates: Optional[dict], ignore_list=[]) -> Optional[dict]:
"""Take an initial stream_config and add any user updates.

:param stream_config: Stream configuration
Expand Down Expand Up @@ -1015,7 +1054,7 @@ def _update_camera_config(self, camera_config, libcamera_config) -> None:
sensor_config['output_size'] = utils.convert_from_libcamera_type(libcamera_config.sensor_config.output_size)
camera_config['sensor'] = sensor_config

def configure_(self, camera_config="preview") -> None:
def configure_(self, camera_config: Union[CameraConfiguration, str, Dict, None] = "preview") -> None:
"""Configure the camera system with the given configuration.

:param camera_config: Configuration, defaults to the 'preview' configuration
Expand Down Expand Up @@ -1198,7 +1237,7 @@ def cancel_all_and_flush(self) -> None:
job.cancel()
self._job_list = []

def stop_(self, request=None) -> None:
def stop_(self, request: Optional[CompletedRequest] = None) -> Tuple[Literal[True], None]:
"""Stop the camera.

Only call this function directly from within the camera event
Expand Down Expand Up @@ -1330,7 +1369,53 @@ def wait(self, job, timeout=None):
"""
return job.get_result(timeout=timeout)

def dispatch_functions(self, functions, wait, signal_function=None, immediate=False) -> None:
@overload
def dispatch_functions(
self,
functions: List[Callable[[], Tuple[bool, T]]],
wait: bool,
signal_function: Optional[Callable[['Job[T]'], None]] = None,
immediate: bool = False,
) -> T:
...

@overload
def dispatch_functions(
self,
functions: List[Callable[[], Tuple[bool, T]]],
wait: float,
signal_function: Optional[Callable[['Job[T]'], None]] = None,
immediate: bool = False,
) -> T:
...

@overload
def dispatch_functions(
self,
functions: List[Callable[[], Tuple[bool, T]]],
wait: None,
signal_function: Optional[Callable[['Job[T]'], None]] = None,
immediate: bool = False,
) -> Optional[T]:
...

@overload
def dispatch_functions(
self,
functions: List[Callable[[], Tuple[bool, T]]],
wait: bool = False,
signal_function: Optional[Callable[['Job[T]'], None]] = None,
immediate: bool = False,
) -> Job[T]:
...

def dispatch_functions(
self,
functions: List[Callable[[], Tuple[bool, Any]]],
wait: Union[bool, float, None] = False,
signal_function: Optional[Callable[['Job[Any]'], None]] = None,
immediate: bool = False,
) -> Union[T, Job[Any], Optional[T]]:
"""The main thread should use this to dispatch a number of operations for the event loop to perform.

When there are multiple items each will be processed on a separate
Expand Down Expand Up @@ -1392,11 +1477,11 @@ def drop_frames(self, num_frames, wait=None, signal_function=None):
functions = [partial(self.set_frame_drops_, num_frames), self.drop_frames_]
return self.dispatch_functions(functions, wait, signal_function, immediate=True)

def capture_file_(self, file_output, name: str, format=None, exif_data=None) -> dict:
def capture_file_(self, file_output: Any, name: str, format=None, exif_data=None) -> Tuple[bool, Optional[Dict]]:
if not self.completed_requests:
return (False, None)
request = self.completed_requests.pop(0)
if name == "raw" and formats.is_raw(self.camera_config["raw"]["format"]):
if name == "raw" and formats.is_raw(cast(Dict, self.camera_config)["raw"]["format"]):
request.save_dng(file_output)
else:
request.save(name, file_output, format=format, exif_data=exif_data)
Expand All @@ -1412,7 +1497,7 @@ def capture_file(
format=None,
wait=None,
signal_function=None,
exif_data=None) -> dict:
exif_data=None) -> Optional[dict]:
"""Capture an image to a file in the current camera mode.

Return the metadata for the frame captured.
Expand All @@ -1424,7 +1509,7 @@ def capture_file(
exif_data=exif_data)]
return self.dispatch_functions(functions, wait, signal_function)

def switch_mode_(self, camera_config):
def switch_mode_(self, camera_config: Union[str, Dict, None]) -> Tuple[bool, Union[Dict, str, None]]:
self.stop_()
self.configure_(camera_config)
self.start_()
Expand Down Expand Up @@ -1540,7 +1625,7 @@ def captured_sync_request(self, wait=None):
finally:
request.release()

def capture_metadata_(self):
def capture_metadata_(self) -> Tuple[bool, Optional[Dict[str, Any]]]:
if not self.completed_requests:
return (False, None)
request = self.completed_requests.pop(0)
Expand All @@ -1565,7 +1650,7 @@ def capture_buffer(self, name="main", wait=None, signal_function=None):
"""Make a 1d numpy array from the next frame in the named stream."""
return self.dispatch_functions([partial(self.capture_buffer_, name)], wait, signal_function)

def capture_buffers_and_metadata_(self, names) -> Tuple[List[np.ndarray], dict]:
def capture_buffers_and_metadata_(self, names) -> Tuple[bool, Optional[Tuple[List[np.ndarray], Dict[str, Any]]]]:
if not self.completed_requests:
return (False, None)
request = self.completed_requests.pop(0)
Expand Down Expand Up @@ -1615,19 +1700,22 @@ def capture_buffers_and_switch_back_(self, preview_config, names):
partial(capture_buffers_and_switch_back_, self, preview_config, names)]
return self.dispatch_functions(functions, wait, signal_function, immediate=True)

def capture_array_(self, name):
def capture_array_(self, name) -> Tuple[bool, Optional[np.ndarray]]:
if not self.completed_requests:
return (False, None)
request = self.completed_requests.pop(0)
result = request.make_array(name)
request.release()
return (True, result)

def capture_array(self, name="main", wait=None, signal_function=None):
def capture_array(self, name="main", wait=None, signal_function=None) -> Optional[np.ndarray]:
"""Make a 2d image from the next frame in the named stream."""
return self.dispatch_functions([partial(self.capture_array_, name)], wait, signal_function)

def capture_arrays_and_metadata_(self, names) -> Tuple[List[np.ndarray], Dict[str, Any]]:
def capture_arrays_and_metadata_(self, names) -> Union[
Tuple[Literal[False], None],
Tuple[Literal[True], Tuple[List[np.ndarray], Dict[str, Any]]],
]:
if not self.completed_requests:
return (False, None)
request = self.completed_requests.pop(0)
Expand Down Expand Up @@ -1677,7 +1765,7 @@ def capture_arrays_and_switch_back_(self, preview_config, names):
partial(capture_arrays_and_switch_back_, self, preview_config, names)]
return self.dispatch_functions(functions, wait, signal_function, immediate=True)

def capture_image_(self, name: str) -> Image.Image:
def capture_image_(self, name: str) -> Tuple[bool, Optional[Image.Image]]:
"""Capture image

:param name: Stream name
Expand All @@ -1690,7 +1778,8 @@ def capture_image_(self, name: str) -> Image.Image:
request.release()
return (True, result)

def capture_image(self, name: str = "main", wait: bool = None, signal_function=None) -> Image.Image:
def capture_image(self, name: str = "main", wait: Optional[Union[bool, float]] = None,
signal_function=None) -> Optional[Image.Image]:
"""Make a PIL image from the next frame in the named stream.

:param name: Stream name, defaults to "main"
Expand All @@ -1704,15 +1793,15 @@ def capture_image(self, name: str = "main", wait: bool = None, signal_function=N
"""
return self.dispatch_functions([partial(self.capture_image_, name)], wait, signal_function)

def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: bool = None,
signal_function=None, delay=0) -> Image.Image:
def switch_mode_and_capture_image(self, camera_config, name: str = "main", wait: Optional[Union[bool, float]] = None,
signal_function=None, delay=0) -> Optional[Image.Image]:
"""Switch the camera into a new (capture) mode, capture the image.

Then return back to the initial camera mode.
"""
preview_config = self.camera_config

def capture_image_and_switch_back_(self, preview_config, name) -> Image.Image:
def capture_image_and_switch_back_(self, preview_config, name) -> Tuple[bool, Optional[Image.Image]]:
done, result = self.capture_image_(name)
if not done:
return (False, None)
Expand Down Expand Up @@ -1831,7 +1920,7 @@ def stop_recording(self) -> None:
self.stop()
self.stop_encoder()

def set_overlay(self, overlay) -> None:
def set_overlay(self, overlay: Optional[np.ndarray]) -> None:
"""Display an overlay on the camera image.

The overlay may be either None, in which case any overlay is removed,
Expand Down
Loading