diff --git a/example/devices.py b/example/devices.py new file mode 100644 index 0000000..6585cb5 --- /dev/null +++ b/example/devices.py @@ -0,0 +1,13 @@ +from pyk4a import PyK4A, connected_device_count + + +cnt = connected_device_count() +if not cnt: + print("No devices available") + exit() +print(f"Available devices: {cnt}") +for device_id in range(cnt): + device = PyK4A(device_id=device_id) + device.open() + print(f"{device_id}: {device.serial}") + device.close() diff --git a/example/record.py b/example/record.py new file mode 100644 index 0000000..1d4791e --- /dev/null +++ b/example/record.py @@ -0,0 +1,29 @@ +from argparse import ArgumentParser + +from pyk4a import Config, ImageFormat, PyK4A, PyK4ARecord + + +parser = ArgumentParser(description="pyk4a recorder") +parser.add_argument("--device", type=int, help="Device ID", default=0) +parser.add_argument("FILE", type=str, help="Path to MKV file") +args = parser.parse_args() + +print(f"Starting device #{args.device}") +config = Config(color_format=ImageFormat.COLOR_MJPG) +device = PyK4A(config=config, device_id=args.device) +device.start() + +print(f"Open record file {args.FILE}") +record = PyK4ARecord(device=device, config=config, path=args.FILE) +record.create() +try: + print("Recording... Press CTRL-C to stop recording.") + while True: + capture = device.get_capture() + record.write_capture(capture) +except KeyboardInterrupt: + print("CTRL-C pressed. Exiting.") + +record.flush() +record.close() +print(f"{record.captures_count} frames written.") diff --git a/pyk4a/__init__.py b/pyk4a/__init__.py index 3ffa9c4..b3f1c56 100644 --- a/pyk4a/__init__.py +++ b/pyk4a/__init__.py @@ -1,3 +1,5 @@ +import k4a_module + from .calibration import Calibration, CalibrationType from .capture import PyK4ACapture from .config import ( @@ -13,6 +15,7 @@ from .errors import K4AException, K4ATimeoutException from .playback import PyK4APlayback, SeekOrigin from .pyk4a import ColorControlCapabilities, PyK4A +from .record import PyK4ARecord from .transformation import ( color_image_to_depth_camera, depth_image_to_color_camera, @@ -21,6 +24,10 @@ ) +def connected_device_count() -> int: + return k4a_module.device_get_installed_count() + + __all__ = ( "Calibration", "CalibrationType", @@ -43,4 +50,6 @@ "depth_image_to_point_cloud", "depth_image_to_color_camera", "depth_image_to_color_camera_custom", + "PyK4ARecord", + "connected_device_count", ) diff --git a/pyk4a/calibration.py b/pyk4a/calibration.py index deb4db7..98ba9f3 100644 --- a/pyk4a/calibration.py +++ b/pyk4a/calibration.py @@ -1,6 +1,8 @@ from enum import IntEnum from typing import Optional, Tuple +import numpy as np + import k4a_module from .config import ColorResolution, DepthMode @@ -157,3 +159,28 @@ def transformation_handle(self) -> object: raise K4AException("Cannot create transformation handle") self._transformation_handle = handle return self._transformation_handle + + def get_camera_matrix(self, camera: CalibrationType) -> np.ndarray: + """ + Get the camera matrix (in OpenCV compatible format) for the color or depth camera + """ + if camera not in [CalibrationType.COLOR, CalibrationType.DEPTH]: + raise ValueError("Camera matrix only available for color and depth cameras.") + params = k4a_module.calibration_get_intrinsics(self._calibration_handle, self.thread_safe, camera) + if len(params) != 14: + raise ValueError("Unknown camera calibration type") + + cx, cy, fx, fy = params[:4] + return np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]]) + + def get_distortion_coefficients(self, camera: CalibrationType) -> np.ndarray: + """ + Get the distortion coefficients (in OpenCV compatible format) for the color or depth camera + """ + if camera not in [CalibrationType.COLOR, CalibrationType.DEPTH]: + raise ValueError("Distortion coefficients only available for color and depth cameras.") + params = k4a_module.calibration_get_intrinsics(self._calibration_handle, self.thread_safe, camera) + if len(params) != 14: + raise ValueError("Unknown camera calibration type") + + return np.array([params[4], params[5], params[13], params[12], *params[6:10]]) diff --git a/pyk4a/capture.py b/pyk4a/capture.py index 4e479ef..2ab2af8 100644 --- a/pyk4a/capture.py +++ b/pyk4a/capture.py @@ -6,6 +6,7 @@ from .calibration import Calibration from .config import ImageFormat +from .errors import K4AException from .transformation import ( color_image_to_depth_camera, depth_image_to_color_camera, @@ -25,6 +26,8 @@ def __init__( self._color: Optional[np.ndarray] = None self._color_timestamp_usec: int = 0 + self._color_exposure_usec: Optional[int] = None + self._color_white_balance: Optional[int] = None self._depth: Optional[np.ndarray] = None self._depth_timestamp_usec: int = 0 self._ir: Optional[np.ndarray] = None @@ -50,6 +53,24 @@ def color_timestamp_usec(self) -> int: self.color return self._color_timestamp_usec + @property + def color_exposure_usec(self) -> int: + if self._color_exposure_usec is None: + value = k4a_module.color_image_get_exposure_usec(self._capture_handle) + if value == 0: + raise K4AException("Cannot read exposure from color image") + self._color_exposure_usec = value + return self._color_exposure_usec + + @property + def color_white_balance(self) -> int: + if self._color_white_balance is None: + value = k4a_module.color_image_get_white_balance(self._capture_handle) + if value == 0: + raise K4AException("Cannot read white balance from color image") + self._color_white_balance = value + return self._color_white_balance + @property def depth(self) -> Optional[np.ndarray]: if self._depth is None: diff --git a/pyk4a/pyk4a.cpp b/pyk4a/pyk4a.cpp index d3d94ef..876ba52 100644 --- a/pyk4a/pyk4a.cpp +++ b/pyk4a/pyk4a.cpp @@ -3,12 +3,14 @@ #include #include +#include #include #ifdef __cplusplus extern "C" { #endif -// to debug, use fprintf(stdout, "debug msg\n") or fprintf(stderr, "debug msg\n");; +// to debug, use fprintf(stdout, "debug msg\n") or fprintf(stderr, "debug +// msg\n");; #define NON_THREAD_SAFE 0 // Simple way to map k4a_color_resolution_t to dimensions @@ -20,6 +22,7 @@ const char *CAPSULE_DEVICE_NAME = "pyk4a device handle"; const char *CAPSULE_CALIBRATION_NAME = "pyk4a calibration handle"; const char *CAPSULE_TRANSFORMATION_NAME = "pyk4a transformation handle"; const char *CAPSULE_CAPTURE_NAME = "pyk4a capture handle"; +const char *CAPSULE_RECORD_NAME = "pyk4a record handle"; static PyThreadState *_gil_release(int thread_safe) { PyThreadState *thread_state = NULL; @@ -66,6 +69,13 @@ static void capsule_cleanup_playback(PyObject *capsule) { free(playback_handle); } +static void capsule_cleanup_record(PyObject *capsule) { + k4a_record_t *record_handle; + + record_handle = (k4a_record_t *)PyCapsule_GetPointer(capsule, CAPSULE_RECORD_NAME); + free(record_handle); +} + static void capsule_cleanup_transformation(PyObject *capsule) { k4a_transformation_t *transformation = (k4a_transformation_t *)PyCapsule_GetPointer(capsule, CAPSULE_TRANSFORMATION_NAME); @@ -99,6 +109,48 @@ static PyObject *device_open(PyObject *self, PyObject *args) { return Py_BuildValue("IN", result, capsule); } +static PyObject *device_get_installed_count(PyObject *self, PyObject *args) { + uint32_t count; + count = k4a_device_get_installed_count(); + return Py_BuildValue("I", count); +} + +static PyObject *device_get_serialnum(PyObject *self, PyObject *args) { + k4a_device_t *device_handle; + PyObject *capsule; + int thread_safe; + PyThreadState *thread_state; + k4a_buffer_result_t result; + size_t data_size; + + PyArg_ParseTuple(args, "Op", &capsule, &thread_safe); + device_handle = (k4a_device_t *)PyCapsule_GetPointer(capsule, CAPSULE_DEVICE_NAME); + + thread_state = _gil_release(thread_safe); + result = k4a_device_get_serialnum(*device_handle, NULL, &data_size); + if (result == K4A_BUFFER_RESULT_FAILED) { + _gil_restore(thread_state); + return Py_BuildValue("s", ""); + } + char *data = (char *)malloc(data_size); + if (data == NULL) { + _gil_restore(thread_state); + fprintf(stderr, "Cannot allocate memory"); + return Py_BuildValue("s", ""); + } + result = k4a_device_get_serialnum(*device_handle, data, &data_size); + if (result != K4A_BUFFER_RESULT_SUCCEEDED) { + free(data); + return Py_BuildValue("s", ""); + } + _gil_restore(thread_state); + + PyObject *res = Py_BuildValue("s", data); + free(data); + + return res; +} + static PyObject *device_close(PyObject *self, PyObject *args) { k4a_device_t *device_handle; PyObject *capsule; @@ -510,6 +562,42 @@ k4a_result_t numpy_to_k4a_image(PyArrayObject *img_src, k4a_image_t *img_dst, k4 img_dst); } +static PyObject *color_image_get_exposure_usec(PyObject *self, PyObject *args) { + k4a_capture_t *capture_handle; + PyObject *capsule; + uint64_t exposure_usec = 0; + PyArg_ParseTuple(args, "O", &capsule); + capture_handle = (k4a_capture_t *)PyCapsule_GetPointer(capsule, CAPSULE_CAPTURE_NAME); + + k4a_image_t image = k4a_capture_get_color_image(*capture_handle); + if (image == NULL) { + fprintf(stderr, "Color image missed"); + return Py_BuildValue("K", exposure_usec); + } + + exposure_usec = k4a_image_get_exposure_usec(image); + k4a_image_release(image); + return Py_BuildValue("K", exposure_usec); +} + +static PyObject *color_image_get_white_balance(PyObject *self, PyObject *args) { + k4a_capture_t *capture_handle; + PyObject *capsule; + uint32_t white_balance = 0; + PyArg_ParseTuple(args, "O", &capsule); + capture_handle = (k4a_capture_t *)PyCapsule_GetPointer(capsule, CAPSULE_CAPTURE_NAME); + + k4a_image_t image = k4a_capture_get_color_image(*capture_handle); + if (image == NULL) { + fprintf(stderr, "Color image missed"); + return Py_BuildValue("I", white_balance); + } + + white_balance = k4a_image_get_white_balance(image); + k4a_image_release(image); + return Py_BuildValue("I", white_balance); +} + static PyObject *transformation_create(PyObject *self, PyObject *args) { k4a_calibration_t *calibration_handle; PyObject *capsule; @@ -947,6 +1035,49 @@ static PyObject *calibration_3d_to_2d(PyObject *self, PyObject *args) { return Py_BuildValue("II(ff)", res, valid, target_point2d.xy.x, target_point2d.xy.y); } +static PyObject *_array_to_list(float *array, size_t length) { + size_t i; + PyObject *result = NULL, *value = NULL; + + result = PyList_New(length); + if (result) { + for (i = 0; i < length; ++i) { + value = PyFloat_FromDouble(array[i]); + if (value) { + PyList_SET_ITEM(result, i, value); + } else { + Py_CLEAR(result); + break; + } + } + } + return result; +} + +static PyObject *calibration_get_intrinsics(PyObject *self, PyObject *args) { + k4a_calibration_t *calibration_handle; + PyObject *capsule; + int thread_safe; + k4a_calibration_type_t camera; + PyThreadState *thread_state; + + PyArg_ParseTuple(args, "OpI", &capsule, &thread_safe, &camera); + calibration_handle = (k4a_calibration_t *)PyCapsule_GetPointer(capsule, CAPSULE_CALIBRATION_NAME); + + thread_state = _gil_release(thread_safe); + + k4a_calibration_camera_t calib; + if (camera == K4A_CALIBRATION_TYPE_DEPTH) { + calib = calibration_handle->depth_camera_calibration; + } else if (camera == K4A_CALIBRATION_TYPE_COLOR) { + calib = calibration_handle->color_camera_calibration; + } + _gil_restore(thread_state); + + PyObject *intrinsics = _array_to_list(calib.intrinsics.parameters.v, calib.intrinsics.parameter_count); + return Py_BuildValue("N", intrinsics); +} + static PyObject *playback_open(PyObject *self, PyObject *args) { int thread_safe; PyThreadState *thread_state; @@ -964,7 +1095,6 @@ static PyObject *playback_open(PyObject *self, PyObject *args) { thread_state = _gil_release(thread_safe); result = k4a_playback_open(file_name, playback_handle); _gil_restore(thread_state); - if (result == K4A_RESULT_FAILED) { free(playback_handle); return Py_BuildValue("IN", result, Py_None); @@ -1170,6 +1300,103 @@ static PyObject *playback_get_previous_capture(PyObject *self, PyObject *args) { return Py_BuildValue("IN", result, capsule_capture); } +static PyObject *record_create(PyObject *self, PyObject *args) { + k4a_device_t *device_handle = NULL; + PyObject *device_capsule; + int thread_safe; + PyThreadState *thread_state; + const char *file_name; + + k4a_record_t *record_handle = (k4a_record_t *)malloc(sizeof(k4a_record_t)); + if (record_handle == NULL) { + fprintf(stderr, "Cannot allocate memory"); + return Py_BuildValue("IN", K4A_RESULT_FAILED, Py_None); + } + + k4a_device_configuration_t config = K4A_DEVICE_CONFIG_INIT_DISABLE_ALL; + PyArg_ParseTuple(args, "OspIIIIpiIIp", &device_capsule, &file_name, &thread_safe, &config.color_format, + &config.color_resolution, &config.depth_mode, &config.camera_fps, &config.synchronized_images_only, + &config.depth_delay_off_color_usec, &config.wired_sync_mode, + &config.subordinate_delay_off_master_usec, &config.disable_streaming_indicator); + + k4a_result_t result; + thread_state = _gil_release(thread_safe); + if (device_capsule != Py_None) { + device_handle = (k4a_device_t *)PyCapsule_GetPointer(device_capsule, CAPSULE_DEVICE_NAME); + result = k4a_record_create(file_name, *device_handle, config, record_handle); + } else { + result = k4a_record_create(file_name, NULL, config, record_handle); + } + _gil_restore(thread_state); + + if (result != K4A_RESULT_SUCCEEDED) { + free(record_handle); + return Py_BuildValue("IN", result, Py_None); + } + PyObject *capsule = PyCapsule_New(record_handle, CAPSULE_RECORD_NAME, capsule_cleanup_record); + return Py_BuildValue("IN", result, capsule); +} + +static PyObject *record_close(PyObject *self, PyObject *args) { + int thread_safe; + PyThreadState *thread_state; + PyObject *capsule; + k4a_record_t *record_handle; + PyArg_ParseTuple(args, "Op", &capsule, &thread_safe); + record_handle = (k4a_record_t *)PyCapsule_GetPointer(capsule, CAPSULE_RECORD_NAME); + thread_state = _gil_release(thread_safe); + k4a_record_close(*record_handle); + _gil_restore(thread_state); + return Py_BuildValue("I", K4A_RESULT_SUCCEEDED); +} + +static PyObject *record_write_header(PyObject *self, PyObject *args) { + int thread_safe; + PyThreadState *thread_state; + PyObject *capsule; + k4a_record_t *record_handle; + k4a_result_t result; + PyArg_ParseTuple(args, "Op", &capsule, &thread_safe); + record_handle = (k4a_record_t *)PyCapsule_GetPointer(capsule, CAPSULE_RECORD_NAME); + thread_state = _gil_release(thread_safe); + result = k4a_record_write_header(*record_handle); + _gil_restore(thread_state); + return Py_BuildValue("I", result); +} + +static PyObject *record_flush(PyObject *self, PyObject *args) { + int thread_safe; + PyThreadState *thread_state; + PyObject *capsule; + k4a_record_t *record_handle; + k4a_result_t result; + PyArg_ParseTuple(args, "Op", &capsule, &thread_safe); + record_handle = (k4a_record_t *)PyCapsule_GetPointer(capsule, CAPSULE_RECORD_NAME); + thread_state = _gil_release(thread_safe); + result = k4a_record_flush(*record_handle); + _gil_restore(thread_state); + return Py_BuildValue("I", result); +} + +static PyObject *record_write_capture(PyObject *self, PyObject *args) { + int thread_safe; + PyThreadState *thread_state; + PyObject *record_capsule; + k4a_record_t *record_handle; + PyObject *capture_capsule; + k4a_capture_t *capture_handle; + k4a_result_t result; + PyArg_ParseTuple(args, "OOp", &record_capsule, &capture_capsule, &thread_safe); + record_handle = (k4a_record_t *)PyCapsule_GetPointer(record_capsule, CAPSULE_RECORD_NAME); + capture_handle = (k4a_capture_t *)PyCapsule_GetPointer(capture_capsule, CAPSULE_CAPTURE_NAME); + + thread_state = _gil_release(thread_safe); + result = k4a_record_write_capture(*record_handle, *capture_handle); + _gil_restore(thread_state); + + return Py_BuildValue("I", result); +} + struct module_state { PyObject *error; }; @@ -1190,7 +1417,8 @@ static PyMethodDef Pyk4aMethods[] = { {"device_get_imu_sample", device_get_imu_sample, METH_VARARGS, "Reads an imu sample"}, {"device_close", device_close, METH_VARARGS, "Close an Azure Kinect device"}, {"device_get_sync_jack", device_get_sync_jack, METH_VARARGS, - "Get the device jack status for the synchronization in and synchronization out connectors."}, + "Get the device jack status for the synchronization in and " + "synchronization out connectors."}, {"device_get_color_control", device_get_color_control, METH_VARARGS, "Get device color control."}, {"device_set_color_control", device_set_color_control, METH_VARARGS, "Set device color control."}, {"device_get_color_control_capabilities", device_get_color_control_capabilities, METH_VARARGS, @@ -1204,7 +1432,9 @@ static PyMethodDef Pyk4aMethods[] = { {"transformation_depth_image_to_color_camera", transformation_depth_image_to_color_camera, METH_VARARGS, "Transforms the depth map into the geometry of the color camera."}, {"transformation_depth_image_to_color_camera_custom", transformation_depth_image_to_color_camera_custom, - METH_VARARGS, "Transforms the custom & depth map into the geometry of the color camera."}, + METH_VARARGS, + "Transforms the custom & depth map into the geometry of the color " + "camera."}, {"transformation_color_image_to_depth_camera", transformation_color_image_to_depth_camera, METH_VARARGS, "Transforms the color image into the geometry of the depth camera."}, {"transformation_depth_image_to_point_cloud", transformation_depth_image_to_point_cloud, METH_VARARGS, @@ -1214,6 +1444,8 @@ static PyMethodDef Pyk4aMethods[] = { "Transforms the coordinates between a pixel and a 3D system"}, {"calibration_3d_to_2d", calibration_3d_to_2d, METH_VARARGS, "Transform a 3D point of a source coordinate system into a 2D pixel coordinate of the target camera"}, + {"calibration_get_intrinsics", calibration_get_intrinsics, METH_VARARGS, + "Gets intrinsic parameters from calibration"}, {"playback_open", playback_open, METH_VARARGS, "Open file for playback"}, {"playback_close", playback_close, METH_VARARGS, "Close opened playback"}, {"playback_get_recording_length_usec", playback_get_recording_length_usec, METH_VARARGS, "Return recording length"}, @@ -1227,6 +1459,16 @@ static PyMethodDef Pyk4aMethods[] = { {"playback_get_next_capture", playback_get_next_capture, METH_VARARGS, "Get next capture from playback"}, {"playback_get_previous_capture", playback_get_previous_capture, METH_VARARGS, "Get previous capture from playback"}, + {"color_image_get_exposure_usec", color_image_get_exposure_usec, METH_VARARGS, + "Get color image exposure in microseconds"}, + {"color_image_get_white_balance", color_image_get_white_balance, METH_VARARGS, "Get color image white balance"}, + {"record_create", record_create, METH_VARARGS, "Opens a new recording file for writing"}, + {"record_close", record_close, METH_VARARGS, "Opens a new recording file for writing"}, + {"record_write_header", record_write_header, METH_VARARGS, "Writes the recording header and metadata to file"}, + {"record_flush", record_flush, METH_VARARGS, "Flushes all pending recording data to disk"}, + {"record_write_capture", record_write_capture, METH_VARARGS, "Writes a camera capture to file"}, + {"device_get_installed_count", device_get_installed_count, METH_VARARGS, "Gets the number of connected devices"}, + {"device_get_serialnum", device_get_serialnum, METH_VARARGS, "Get the Azure Kinect device serial number."}, {NULL, NULL, 0, NULL}}; @@ -1243,6 +1485,7 @@ static int pyk4a_clear(PyObject *m) { static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, "k4a_module", NULL, sizeof(struct module_state), Pyk4aMethods, NULL, pyk4a_traverse, pyk4a_clear, NULL}; + #define INITERROR return NULL PyMODINIT_FUNC PyInit_k4a_module(void) { import_array(); diff --git a/pyk4a/pyk4a.py b/pyk4a/pyk4a.py index bf9872a..21d01a2 100644 --- a/pyk4a/pyk4a.py +++ b/pyk4a/pyk4a.py @@ -140,6 +140,14 @@ def get_imu_sample(self, timeout: int = TIMEOUT_WAIT_INFINITE) -> Optional["ImuS _verify_error(res) return imu_sample + @property + def serial(self) -> str: + self._validate_is_opened() + ret = k4a_module.device_get_serialnum(self._device_handle, self.thread_safe) + if ret == "": + raise K4AException("Cannot read serial") + return ret + @property def calibration_raw(self) -> str: self._validate_is_opened() diff --git a/pyk4a/record.py b/pyk4a/record.py new file mode 100644 index 0000000..c13040f --- /dev/null +++ b/pyk4a/record.py @@ -0,0 +1,92 @@ +from pathlib import Path +from typing import Optional, Union + +import k4a_module + +from .capture import PyK4ACapture +from .config import Config +from .errors import K4AException +from .pyk4a import PyK4A +from .results import Result + + +class PyK4ARecord: + def __init__( + self, path: Union[str, Path], config: Config, device: Optional[PyK4A] = None, thread_safe: bool = True + ): + self._path: Path = Path(path) + self.thread_safe = thread_safe + self._device: Optional[PyK4A] = device + self._config: Config = config + self._handle: Optional[object] = None + self._header_written: bool = False + self._captures_count: int = 0 + + def __del__(self): + if self.created: + self.close() + + def create(self) -> None: + """ Create record file """ + if self.created: + raise K4AException(f"Record already created {self._path}") + device_handle = self._device._device_handle if self._device else None + result, handle = k4a_module.record_create( + device_handle, str(self._path), self.thread_safe, *self._config.unpack() + ) + if result != Result.Success: + raise K4AException(f"Cannot create record {self._path}") + self._handle = handle + + def close(self): + """ Close record """ + self._validate_is_created() + k4a_module.record_close(self._handle, self.thread_safe) + self._handle = None + + def write_header(self): + """ Write MKV header """ + self._validate_is_created() + if self.header_written: + raise K4AException(f"Header already written {self._path}") + result: Result = k4a_module.record_write_header(self._handle, self.thread_safe) + if result != Result.Success: + raise K4AException(f"Cannot write record header {self._path}") + self._header_written = True + + def write_capture(self, capture: PyK4ACapture): + """ Write capture to file (send to queue) """ + self._validate_is_created() + if not self.header_written: + self.write_header() + result: Result = k4a_module.record_write_capture(self._handle, capture._capture_handle, self.thread_safe) + if result != Result.Success: + raise K4AException(f"Cannot write capture {self._path}") + self._captures_count += 1 + + def flush(self): + """ Flush queue""" + self._validate_is_created() + result: Result = k4a_module.record_flush(self._handle, self.thread_safe) + if result != Result.Success: + raise K4AException(f"Cannot flush data {self._path}") + + @property + def created(self) -> bool: + return self._handle is not None + + @property + def header_written(self) -> bool: + return self._header_written + + @property + def captures_count(self) -> int: + return self._captures_count + + @property + def path(self) -> Path: + return self._path + + def _validate_is_created(self): + if not self.created: + raise K4AException("Record not created.") diff --git a/setup.py b/setup.py index 85962b6..966c677 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def __str__(self): libraries=['k4a', 'k4arecord']) setup(name='pyk4a', - version='1.1.0', + version='1.2.0', description='Python wrapper over Azure Kinect SDK', license='GPL-3.0', author='Etienne Dubeau', diff --git a/tests/conftest.py b/tests/conftest.py index fe341a2..ddf8f55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,23 @@ +from pathlib import Path + +import pytest + +from pyk4a import PyK4APlayback + + pytest_plugins = [ "tests.plugins.calibration", "tests.plugins.capture", "tests.plugins.device", "tests.plugins.playback", ] + + +@pytest.fixture() +def recording_path() -> Path: + return Path(__file__).parent / "assets" / "recording.mkv" + + +@pytest.fixture() +def playback(recording_path: Path) -> PyK4APlayback: + return PyK4APlayback(path=recording_path) diff --git a/tests/functional/test_capture.py b/tests/functional/test_capture.py index 9838be1..5f52d44 100644 --- a/tests/functional/test_capture.py +++ b/tests/functional/test_capture.py @@ -21,3 +21,5 @@ def test_device_capture_images(device: PyK4A): assert np.array_equal(color, depth) is False assert np.array_equal(depth, ir) is False assert np.array_equal(ir, color) is False + assert capture.color_white_balance is not None + assert capture.color_exposure_usec is not None diff --git a/tests/functional/test_device.py b/tests/functional/test_device.py index 5270128..56139a7 100644 --- a/tests/functional/test_device.py +++ b/tests/functional/test_device.py @@ -1,6 +1,6 @@ import pytest -from pyk4a import K4AException, PyK4A +from pyk4a import K4AException, PyK4A, connected_device_count class TestOpenClose: @@ -56,6 +56,13 @@ def test_get_calibration(device: PyK4A): calibration = device.calibration assert calibration._calibration_handle + @staticmethod + @pytest.mark.device + def test_serial(device: PyK4A): + device.open() + serial = device.serial + assert len(serial) > 5 + class TestCameras: @staticmethod @@ -108,3 +115,10 @@ def test_calibration_raw(device: PyK4A): print(raw, file=sys.stderr) assert raw + + +class TestInstalledCount: + @staticmethod + def test_count(): + count = connected_device_count() + assert count >= 0 diff --git a/tests/functional/test_playback.py b/tests/functional/test_playback.py index e3090f8..0352a6e 100644 --- a/tests/functional/test_playback.py +++ b/tests/functional/test_playback.py @@ -1,5 +1,3 @@ -from pathlib import Path - import pytest from pyk4a import K4AException, PyK4APlayback, SeekOrigin @@ -25,16 +23,6 @@ ) -@pytest.fixture() -def recording_path() -> Path: - return Path(__file__).parent.parent / "assets" / "recording.mkv" - - -@pytest.fixture() -def playback(recording_path: Path) -> PyK4APlayback: - return PyK4APlayback(path=recording_path) - - class TestInit: @staticmethod def test_not_existing_path(): diff --git a/tests/functional/test_record.py b/tests/functional/test_record.py new file mode 100644 index 0000000..9ecdb45 --- /dev/null +++ b/tests/functional/test_record.py @@ -0,0 +1,52 @@ +from pathlib import Path + +import pytest + +from pyk4a import Config, ImageFormat, PyK4ACapture, PyK4APlayback, PyK4ARecord + + +@pytest.fixture() +def record_path(tmp_path: Path) -> Path: + return tmp_path / "record.mkv" + + +@pytest.fixture() +def record(record_path: Path) -> PyK4ARecord: + return PyK4ARecord(config=Config(color_format=ImageFormat.COLOR_MJPG), path=record_path) + + +@pytest.fixture() +def created_record(record: PyK4ARecord) -> PyK4ARecord: + record.create() + return record + + +@pytest.fixture() +def capture(playback: PyK4APlayback) -> PyK4ACapture: + playback.open() + return playback.get_next_capture() + + +class TestCreate: + @staticmethod + def test_create(record: PyK4ARecord): + record.create() + assert record.path.exists() + + +class TestClose: + @staticmethod + def test_closing(created_record: PyK4ARecord): + created_record.close() + assert created_record.path.exists() + + +class TestFlush: + @staticmethod + def test_file_size_increased(created_record: PyK4ARecord, capture: PyK4ACapture): + created_record.write_header() + size_before = created_record.path.stat().st_size + created_record.write_capture(capture) + created_record.flush() + size_after = created_record.path.stat().st_size + assert size_after > size_before diff --git a/tests/plugins/device.py b/tests/plugins/device.py index 00fe418..7d093f9 100644 --- a/tests/plugins/device.py +++ b/tests/plugins/device.py @@ -20,6 +20,7 @@ class DeviceMeta: id: int jack_in: bool = False jack_out: bool = False + serial: str = "123456789" color_controls: Tuple[ColorControlCapabilities, ...] = ( ColorControlCapabilities( color_control_command=ColorControlCommand.EXPOSURE_TIME_ABSOLUTE, @@ -246,6 +247,10 @@ def device_get_raw_calibration(self) -> Optional[str]: assert self._opened is True return "{}" + def device_get_serialnum(self) -> str: + assert self._opened is True + return self._meta.serial + def _device_open(device_id: int, thread_safe: bool) -> Tuple[int, object]: if device_id not in DEVICE_METAS: return Result.Failed.value, None @@ -310,6 +315,12 @@ def _device_get_calibration( def _device_get_raw_calibration(capsule: DeviceHandle, thread_safe) -> Optional[str]: return capsule.device_get_raw_calibration() + def _device_get_installed_count() -> int: + return 1 + + def _device_get_serialnum(capsule: DeviceHandle, thread_safe) -> Optional[str]: + return capsule.device_get_serialnum() + monkeypatch.setattr("k4a_module.device_open", _device_open) monkeypatch.setattr("k4a_module.device_close", _device_close) monkeypatch.setattr("k4a_module.device_get_sync_jack", _device_get_sync_jack) @@ -324,6 +335,8 @@ def _device_get_raw_calibration(capsule: DeviceHandle, thread_safe) -> Optional[ monkeypatch.setattr("k4a_module.device_get_imu_sample", _device_get_imu_sample) monkeypatch.setattr("k4a_module.device_get_calibration", _device_get_calibration) monkeypatch.setattr("k4a_module.device_get_raw_calibration", _device_get_raw_calibration) + monkeypatch.setattr("k4a_module.device_get_installed_count", _device_get_installed_count) + monkeypatch.setattr("k4a_module.device_get_serialnum", _device_get_serialnum) @pytest.fixture() diff --git a/tests/unit/test_device.py b/tests/unit/test_device.py index 28d71a9..ca102b9 100644 --- a/tests/unit/test_device.py +++ b/tests/unit/test_device.py @@ -2,7 +2,7 @@ import pytest -from pyk4a import K4AException, PyK4A +from pyk4a import K4AException, PyK4A, connected_device_count DEVICE_ID = 0 @@ -139,6 +139,12 @@ def test_calibration(device: PyK4A): calibration = device.calibration assert calibration + @staticmethod + def test_serial(device: PyK4A): + device.open() + serial = device.serial + assert serial == "123456789" + class TestCameras: @staticmethod @@ -179,3 +185,10 @@ def test_calibration_raw_on_closed_device(device: PyK4A): def test_calibration_raw(device: PyK4A): device.open() assert device.calibration_raw + + +class TestInstalledCount: + @staticmethod + def test_count(patch_module_device): + count = connected_device_count() + assert count == 1 diff --git a/tests/unit/test_record.py b/tests/unit/test_record.py new file mode 100644 index 0000000..9c21ad3 --- /dev/null +++ b/tests/unit/test_record.py @@ -0,0 +1,96 @@ +from pathlib import Path + +import pytest + +from pyk4a import Config, ImageFormat, K4AException, PyK4ACapture, PyK4APlayback, PyK4ARecord + + +@pytest.fixture() +def record_path(tmp_path: Path) -> Path: + return tmp_path / "record.mkv" + + +@pytest.fixture() +def record(record_path: Path) -> PyK4ARecord: + return PyK4ARecord(config=Config(color_format=ImageFormat.COLOR_MJPG), path=record_path) + + +@pytest.fixture() +def created_record(record: PyK4ARecord) -> PyK4ARecord: + record.create() + return record + + +@pytest.fixture() +def capture(playback: PyK4APlayback) -> PyK4ACapture: + playback.open() + return playback.get_next_capture() + + +class TestCreate: + @staticmethod + def test_bad_path(tmp_path: Path): + path = tmp_path / "not-exists" / "file.mkv" + record = PyK4ARecord(config=Config(), path=path) + with pytest.raises(K4AException, match=r"Cannot create"): + record.create() + assert not record.created + + @staticmethod + def test_create(record: PyK4ARecord): + record.create() + assert record.created + + @staticmethod + def test_recreate(created_record: PyK4ARecord): + with pytest.raises(K4AException, match=r"already"): + created_record.create() + + +class TestClose: + @staticmethod + def test_not_created_record(record: PyK4ARecord): + with pytest.raises(K4AException, match=r"not created"): + record.close() + + @staticmethod + def test_closing(created_record: PyK4ARecord): + created_record.close() + assert not created_record.created + + +class TestWriteHeader: + @staticmethod + def test_not_created_record(record: PyK4ARecord): + with pytest.raises(K4AException, match=r"not created"): + record.write_header() + + @staticmethod + def test_double_writing(created_record: PyK4ARecord): + created_record.write_header() + with pytest.raises(K4AException, match=r"already written"): + created_record.write_header() + + +class TestWriteCapture: + @staticmethod + def test_not_created_record(record: PyK4ARecord, capture: PyK4ACapture): + with pytest.raises(K4AException, match=r"not created"): + record.write_capture(capture) + + @staticmethod + def test_header_created(created_record: PyK4ARecord, capture: PyK4ACapture): + created_record.write_capture(capture) + assert created_record.header_written + + @staticmethod + def test_captures_count_increased(created_record: PyK4ARecord, capture: PyK4ACapture): + created_record.write_capture(capture) + assert created_record.captures_count == 1 + + +class TestFlush: + @staticmethod + def test_not_created_record(record: PyK4ARecord): + with pytest.raises(K4AException, match=r"not created"): + record.flush()