diff --git a/adafruit_wiznet5k/adafruit_wiznet5k.py b/adafruit_wiznet5k/adafruit_wiznet5k.py index 7d1baa1..9bb1f57 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k.py @@ -187,7 +187,8 @@ def __init__( # attempt to initialize the module self._ch_base_msb = 0 - assert self._w5xxx_init() == 1, "Failed to initialize WIZnet module." + if self._w5xxx_init() != 1: + raise RuntimeError("Failed to initialize WIZnet module.") # Set MAC address self.mac_address = mac self.src_port = 0 @@ -214,7 +215,8 @@ def __init__( ret = self.set_dhcp(hostname, dhcp_timeout) if ret != 0: self._dhcp_client = None - assert ret == 0, "Failed to configure DHCP Server!" + if ret != 0: + raise RuntimeError("Failed to configure DHCP Server!") def set_dhcp( self, hostname: Optional[str] = None, response_timeout: float = 30 @@ -272,7 +274,8 @@ def get_host_by_name(self, hostname: str) -> bytes: ret = _dns_client.gethostbyname(hostname) if self._debug: print("* Resolved IP: ", ret) - assert ret != -1, "Failed to resolve hostname!" + if ret == -1: + raise RuntimeError("Failed to resolve hostname!") return ret @property @@ -626,7 +629,8 @@ def socket_available(self, socket_num: int, sock_type: int = SNMR_TCP) -> int: socket_num, sock_type ) ) - assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets." + if socket_num > self.max_sockets: + raise ValueError("Provided socket exceeds max_sockets.") res = self._get_rx_rcv_size(socket_num) @@ -679,7 +683,8 @@ def socket_connect( :param int conn_mode: The connection mode. Use SNMR_TCP for TCP or SNMR_UDP for UDP, defaults to SNMR_TCP. """ - assert self.link_status, "Ethernet cable disconnected!" + if not self.link_status: + raise ConnectionError("Ethernet cable disconnected!") if self._debug: print( "* w5k socket connect, protocol={}, port={}, ip={}".format( @@ -689,7 +694,7 @@ def socket_connect( # initialize a socket and set the mode res = self.socket_open(socket_num, conn_mode=conn_mode) if res == 1: - raise RuntimeError("Failed to initialize a connection with the socket.") + raise ConnectionError("Failed to initialize a connection with the socket.") # set socket destination IP and port self._write_sndipr(socket_num, dest) @@ -703,7 +708,7 @@ def socket_connect( if self._debug: print("SN_SR:", self.socket_status(socket_num)[0]) if self.socket_status(socket_num)[0] == SNSR_SOCK_CLOSED: - raise RuntimeError("Failed to establish connection.") + raise ConnectionError("Failed to establish connection.") elif conn_mode == SNMR_UDP: self.udp_datasize[socket_num] = 0 return 1 @@ -747,7 +752,8 @@ def socket_listen( :param int conn_mode: Connection mode SNMR_TCP for TCP or SNMR_UDP for UDP, defaults to SNMR_TCP. """ - assert self.link_status, "Ethernet cable disconnected!" + if not self.link_status: + raise ConnectionError("Ethernet cable disconnected!") if self._debug: print( "* Listening on port={}, ip={}".format( @@ -807,7 +813,8 @@ def socket_open(self, socket_num: int, conn_mode: int = SNMR_TCP) -> int: UDP, defaults to SNMR_TCP. :return int: 1 if the socket was opened, 0 if not. """ - assert self.link_status, "Ethernet cable disconnected!" + if not self.link_status: + raise ConnectionError("Ethernet cable disconnected!") if self._debug: print("*** Opening socket %d" % socket_num) status = self._read_snsr(socket_num)[0] @@ -839,10 +846,8 @@ def socket_open(self, socket_num: int, conn_mode: int = SNMR_TCP) -> int: # open socket self._write_sncr(socket_num, CMD_SOCK_OPEN) self._read_sncr(socket_num) - assert ( - self._read_snsr((socket_num))[0] == 0x13 - or self._read_snsr((socket_num))[0] == 0x22 - ), "Could not open socket in TCP or UDP mode." + if self._read_snsr((socket_num))[0] not in [0x13, 0x22]: + raise RuntimeError("Could not open socket in TCP or UDP mode.") return 0 return 1 @@ -868,7 +873,7 @@ def socket_disconnect(self, socket_num: int) -> None: self._write_sncr(socket_num, CMD_SOCK_DISCON) self._read_sncr(socket_num) - def socket_read( + def socket_read( # pylint: disable=too-many-branches self, socket_num: int, length: int ) -> Tuple[int, Union[int, bytearray]]: """ @@ -882,8 +887,11 @@ def socket_read( was unsuccessful then both items equal an error code, 0 for no data waiting and -1 for no connection to the socket. """ - assert self.link_status, "Ethernet cable disconnected!" - assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets." + + if not self.link_status: + raise ConnectionError("Ethernet cable disconnected!") + if socket_num > self.max_sockets: + raise ValueError("Provided socket exceeds max_sockets.") # Check if there is data available on the socket ret = self._get_rx_rcv_size(socket_num) @@ -976,7 +984,8 @@ def socket_write( :return int: The number of bytes written to the buffer. """ - assert self.link_status, "Ethernet cable disconnected!" + if not self.link_status: + raise ConnectionError("Ethernet cable disconnected!") assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets." status = 0 ret = 0 diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py index 3243f77..b07ff9d 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py @@ -274,10 +274,11 @@ def parse_dhcp_response( # -- Parse Packet, FIXED -- # # Validate OP - assert ( - _BUFF[0] == DHCP_BOOT_REPLY - ), "Malformed Packet - \ + if _BUFF[0] != DHCP_BOOT_REPLY: + raise RuntimeError( + "Malformed Packet - \ DHCP message OP is not expected BOOT Reply." + ) xid = _BUFF[4:8] if bytes(xid) < self._initial_xid: diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py index 01a372a..b34cb08 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_socket.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_socket.py @@ -101,7 +101,7 @@ def getaddrinfo( :return List[Tuple[int, int, int, str, Tuple[str, int]]]: Address info entries. """ if not isinstance(port, int): - raise RuntimeError("Port must be an integer") + raise ValueError("Port must be an integer") if is_ipv4(host): return [(AF_INET, socktype, proto, "", (host, port))] return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))] @@ -283,7 +283,8 @@ def listen(self, backlog: Optional[int] = None) -> None: :param Optional[int] backlog: Included for compatibility but ignored. """ - assert self._listen_port is not None, "Use bind to set the port before listen!" + if self._listen_port is None: + raise RuntimeError("Use bind to set the port before listen!") _the_interface.socket_listen(self.socknum, self._listen_port) self._buffer = b"" @@ -340,9 +341,10 @@ def connect( :param Optional[int] conntype: Raises an exception if set to 3, unused otherwise, defaults to None. """ - assert ( - conntype != 0x03 - ), "Error: SSL/TLS is not currently supported by CircuitPython." + if conntype == 0x03: + raise NotImplementedError( + "Error: SSL/TLS is not currently supported by CircuitPython." + ) host, port = address if hasattr(host, "split"): @@ -565,7 +567,8 @@ def readline(self) -> bytes: def disconnect(self) -> None: """Disconnect a TCP socket.""" - assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket." + if self._sock_type != SOCK_STREAM: + raise RuntimeError("Socket must be a TCP socket.") _the_interface.socket_disconnect(self.socknum) def close(self) -> None: