From 8bd9fc908437a5133b949be8f421c168c23b995b Mon Sep 17 00:00:00 2001 From: Franz Haas Date: Sat, 14 Dec 2024 19:00:43 +0100 Subject: [PATCH] - missed a file --- atests/nfc_nci/nci_interface.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/atests/nfc_nci/nci_interface.py b/atests/nfc_nci/nci_interface.py index b1e57c1..e00474c 100644 --- a/atests/nfc_nci/nci_interface.py +++ b/atests/nfc_nci/nci_interface.py @@ -1,4 +1,5 @@ import serial +import select import time @@ -6,16 +7,23 @@ class nci_interface(): def __init__(self): self._serial_connection = None - def open_nci_connection_via_uart(self, device: str, baudrate: int): + def open_nci_connection_via_uart(self, device: str, baudrate: int, timeout=1.0): """ Opens a connection to a NCI device via UART. Returns a tuple of two file objects, first one for reading and second one for writing. """ - self._serial_connection = serial.Serial(device, baudrate, timeout=1) + class timeoutExceptionOnTimeoutSerial(serial.Serial): + def read(self, size=1): + retBuf = super().read(size) + assert len(retBuf) >= size, f"Timeout while reading from serial port, received {len(retBuf)} bytes instead of {size}" + assert len(retBuf) == size, f"Received more data ({len(retBuf)}) then requested ({size}). " + return retBuf + + self._serial_connection = timeoutExceptionOnTimeoutSerial(device, baudrate, timeout=timeout) self._serial_connection.reset_input_buffer() - return (open(self._serial_connection.fd, "rb", buffering=0, closefd=False), - open(self._serial_connection.fd, "wb", buffering=0, closefd=False),) + return self._serial_connection # Alternatively, on linux we can use self._serial_connection.fileno() + select instead of the timeout read... + def wait_for_data_from_nci(self, timeout: float = 1.0): """ @@ -24,12 +32,14 @@ def wait_for_data_from_nci(self, timeout: float = 1.0): Raises an exception if the NCI connection is not open. """ if self._serial_connection and self._serial_connection.is_open: - endTime = time.time() + timeout - while time.time() < endTime: - if self._serial_connection.in_waiting: - return - time.sleep(0.001) - assert self._serial_connection.in_waiting, "Timeout while waiting for data from NCI device" + try: + select.select([self._serial_connection.fileno()], [], [], timeout) + except AttributeError: + # Windows does not support select on serial ports, so we have to do it the hard way + endTime = time.time() + timeout + while time.time() < endTime and not self._serial_connection.in_waiting: + time.sleep(0.001) + assert self._serial_connection.in_waiting, "Timeout while waiting for data from NCI device" else: raise Exception("NCI connection is not open")