diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index a6d8fb84b..3a25c99d8 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -15,6 +15,7 @@ Any, Callable, Dict, + Iterator, List, NamedTuple, Optional, @@ -205,7 +206,10 @@ def __init__( self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 for channel in self.channels: - if (_channel_index := kwargs.get("channel_index", None)) is not None: + if ( + len(self.channels) == 1 + and (_channel_index := kwargs.get("channel_index", None)) is not None + ): # VectorBus._detect_available_configs() might return multiple # devices with the same serial number, e.g. if a VN8900 is connected via both USB and Ethernet # at the same time. If the VectorBus is instantiated with a config, that was returned from @@ -250,42 +254,62 @@ def __init__( self.permission_mask = permission_mask.value LOG.debug( - "Open Port: PortHandle: %d, PermissionMask: 0x%X", + "Open Port: PortHandle: %d, ChannelMask: 0x%X, PermissionMask: 0x%X", self.port_handle.value, - permission_mask.value, + self.mask, + self.permission_mask, ) + assert_timing = (bitrate or timing) and not self.__testing + # set CAN settings - for channel in self.channels: - if isinstance(timing, BitTiming): - timing = check_or_adjust_timing_clock(timing, [16_000_000, 8_000_000]) - self._set_bit_timing( - channel=channel, - timing=timing, + if isinstance(timing, BitTiming): + timing = check_or_adjust_timing_clock(timing, [16_000_000, 8_000_000]) + self._set_bit_timing(channel_mask=self.mask, timing=timing) + if assert_timing: + self._check_can_settings( + channel_mask=self.mask, + bitrate=timing.bitrate, + sample_point=timing.sample_point, ) - elif isinstance(timing, BitTimingFd): - timing = check_or_adjust_timing_clock(timing, [80_000_000]) - self._set_bit_timing_fd( - channel=channel, - timing=timing, + elif isinstance(timing, BitTimingFd): + timing = check_or_adjust_timing_clock(timing, [80_000_000]) + self._set_bit_timing_fd(channel_mask=self.mask, timing=timing) + if assert_timing: + self._check_can_settings( + channel_mask=self.mask, + bitrate=timing.nom_bitrate, + sample_point=timing.nom_sample_point, + fd=True, + data_bitrate=timing.data_bitrate, + data_sample_point=timing.data_sample_point, ) - elif fd: - self._set_bit_timing_fd( - channel=channel, - timing=BitTimingFd.from_bitrate_and_segments( - f_clock=80_000_000, - nom_bitrate=bitrate or 500_000, - nom_tseg1=tseg1_abr, - nom_tseg2=tseg2_abr, - nom_sjw=sjw_abr, - data_bitrate=data_bitrate or bitrate or 500_000, - data_tseg1=tseg1_dbr, - data_tseg2=tseg2_dbr, - data_sjw=sjw_dbr, - ), + elif fd: + timing = BitTimingFd.from_bitrate_and_segments( + f_clock=80_000_000, + nom_bitrate=bitrate or 500_000, + nom_tseg1=tseg1_abr, + nom_tseg2=tseg2_abr, + nom_sjw=sjw_abr, + data_bitrate=data_bitrate or bitrate or 500_000, + data_tseg1=tseg1_dbr, + data_tseg2=tseg2_dbr, + data_sjw=sjw_dbr, + ) + self._set_bit_timing_fd(channel_mask=self.mask, timing=timing) + if assert_timing: + self._check_can_settings( + channel_mask=self.mask, + bitrate=timing.nom_bitrate, + sample_point=timing.nom_sample_point, + fd=True, + data_bitrate=timing.data_bitrate, + data_sample_point=timing.data_sample_point, ) - elif bitrate: - self._set_bitrate(channel=channel, bitrate=bitrate) + elif bitrate: + self._set_bitrate(channel_mask=self.mask, bitrate=bitrate) + if assert_timing: + self._check_can_settings(channel_mask=self.mask, bitrate=bitrate) # Enable/disable TX receipts tx_receipts = 1 if receive_own_messages else 0 @@ -405,45 +429,41 @@ def _find_global_channel_idx( def _has_init_access(self, channel: int) -> bool: return bool(self.permission_mask & self.channel_masks[channel]) - def _read_bus_params(self, channel: int) -> "VectorBusParams": - channel_mask = self.channel_masks[channel] - - vcc_list = get_channel_configs() + def _read_bus_params( + self, channel_index: int, vcc_list: List["VectorChannelConfig"] + ) -> "VectorBusParams": for vcc in vcc_list: - if vcc.channel_mask == channel_mask: + if vcc.channel_index == channel_index: bus_params = vcc.bus_params if bus_params is None: # for CAN channels, this should never be `None` raise ValueError("Invalid bus parameters.") return bus_params + channel = self.index_to_channel[channel_index] raise CanInitializationError( f"Channel configuration for channel {channel} not found." ) - def _set_bitrate(self, channel: int, bitrate: int) -> None: - # set parameters if channel has init access - if self._has_init_access(channel): + def _set_bitrate(self, channel_mask: int, bitrate: int) -> None: + # set parameters for channels with init access + channel_mask = channel_mask & self.permission_mask + if channel_mask: self.xldriver.xlCanSetChannelBitrate( self.port_handle, - self.channel_masks[channel], + channel_mask, bitrate, ) LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate) - if not self.__testing: - self._check_can_settings( - channel=channel, - bitrate=bitrate, - ) - - def _set_bit_timing(self, channel: int, timing: BitTiming) -> None: - # set parameters if channel has init access - if self._has_init_access(channel): + def _set_bit_timing(self, channel_mask: int, timing: BitTiming) -> None: + # set parameters for channels with init access + channel_mask = channel_mask & self.permission_mask + if channel_mask: if timing.f_clock == 8_000_000: self.xldriver.xlCanSetChannelParamsC200( self.port_handle, - self.channel_masks[channel], + channel_mask, timing.btr0, timing.btr1, ) @@ -461,7 +481,7 @@ def _set_bit_timing(self, channel: int, timing: BitTiming) -> None: chip_params.sam = timing.nof_samples self.xldriver.xlCanSetChannelParams( self.port_handle, - self.channel_masks[channel], + channel_mask, chip_params, ) LOG.info( @@ -476,20 +496,14 @@ def _set_bit_timing(self, channel: int, timing: BitTiming) -> None: f"timing.f_clock must be 8_000_000 or 16_000_000 (is {timing.f_clock})" ) - if not self.__testing: - self._check_can_settings( - channel=channel, - bitrate=timing.bitrate, - sample_point=timing.sample_point, - ) - def _set_bit_timing_fd( self, - channel: int, + channel_mask: int, timing: BitTimingFd, ) -> None: - # set parameters if channel has init access - if self._has_init_access(channel): + # set parameters for channels with init access + channel_mask = channel_mask & self.permission_mask + if channel_mask: canfd_conf = xlclass.XLcanFdConf() canfd_conf.arbitrationBitRate = timing.nom_bitrate canfd_conf.sjwAbr = timing.nom_sjw @@ -500,7 +514,7 @@ def _set_bit_timing_fd( canfd_conf.tseg1Dbr = timing.data_tseg1 canfd_conf.tseg2Dbr = timing.data_tseg2 self.xldriver.xlCanFdSetConfiguration( - self.port_handle, self.channel_masks[channel], canfd_conf + self.port_handle, channel_mask, canfd_conf ) LOG.info( "xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u", @@ -520,19 +534,9 @@ def _set_bit_timing_fd( canfd_conf.tseg2Dbr, ) - if not self.__testing: - self._check_can_settings( - channel=channel, - bitrate=timing.nom_bitrate, - sample_point=timing.nom_sample_point, - fd=True, - data_bitrate=timing.data_bitrate, - data_sample_point=timing.data_sample_point, - ) - def _check_can_settings( self, - channel: int, + channel_mask: int, bitrate: int, sample_point: Optional[float] = None, fd: bool = False, @@ -540,83 +544,90 @@ def _check_can_settings( data_sample_point: Optional[float] = None, ) -> None: """Compare requested CAN settings to active settings in driver.""" - bus_params = self._read_bus_params(channel) - # use canfd even if fd==False, bus_params.can and bus_params.canfd are a C union - bus_params_data = bus_params.canfd - settings_acceptable = True - - # check bus type - settings_acceptable &= ( - bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN - ) - - # check CAN operation mode - # skip the check if can_op_mode is 0 - # as it happens for cancaseXL, VN7600 and sometimes on other hardware (VN1640) - if bus_params_data.can_op_mode: - if fd: - settings_acceptable &= bool( - bus_params_data.can_op_mode - & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD - ) - else: - settings_acceptable &= bool( - bus_params_data.can_op_mode - & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20 - ) - - # check bitrates - if bitrate: - settings_acceptable &= ( - abs(bus_params_data.bitrate - bitrate) < bitrate / 256 - ) - if fd and data_bitrate: - settings_acceptable &= ( - abs(bus_params_data.data_bitrate - data_bitrate) < data_bitrate / 256 + vcc_list = get_channel_configs() + for channel_index in _iterate_channel_index(channel_mask): + bus_params = self._read_bus_params( + channel_index=channel_index, vcc_list=vcc_list ) + # use bus_params.canfd even if fd==False, bus_params.can and bus_params.canfd are a C union + bus_params_data = bus_params.canfd + settings_acceptable = True - # check sample points - if sample_point: - nom_sample_point_act = ( - 100 - * (1 + bus_params_data.tseg1_abr) - / (1 + bus_params_data.tseg1_abr + bus_params_data.tseg2_abr) - ) + # check bus type settings_acceptable &= ( - abs(nom_sample_point_act - sample_point) < 2.0 # 2 percent tolerance - ) - if fd and data_sample_point: - data_sample_point_act = ( - 100 - * (1 + bus_params_data.tseg1_dbr) - / (1 + bus_params_data.tseg1_dbr + bus_params_data.tseg2_dbr) - ) - settings_acceptable &= ( - abs(data_sample_point_act - data_sample_point) - < 2.0 # 2 percent tolerance + bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN ) - if not settings_acceptable: - # The error message depends on the currently active CAN settings. - # If the active operation mode is CAN FD, show the active CAN FD timings, - # otherwise show CAN 2.0 timings. - if bool( - bus_params_data.can_op_mode - & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD - ): - active_settings = bus_params.canfd._asdict() - active_settings["can_op_mode"] = "CAN FD" - else: - active_settings = bus_params.can._asdict() - active_settings["can_op_mode"] = "CAN 2.0" - settings_string = ", ".join( - [f"{key}: {val}" for key, val in active_settings.items()] - ) - raise CanInitializationError( - f"The requested settings could not be set for channel {channel}. " - f"Another application might have set incompatible settings. " - f"These are the currently active settings: {settings_string}." - ) + # check CAN operation mode + # skip the check if can_op_mode is 0 + # as it happens for cancaseXL, VN7600 and sometimes on other hardware (VN1640) + if bus_params_data.can_op_mode: + if fd: + settings_acceptable &= bool( + bus_params_data.can_op_mode + & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD + ) + else: + settings_acceptable &= bool( + bus_params_data.can_op_mode + & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20 + ) + + # check bitrates + if bitrate: + settings_acceptable &= ( + abs(bus_params_data.bitrate - bitrate) < bitrate / 256 + ) + if fd and data_bitrate: + settings_acceptable &= ( + abs(bus_params_data.data_bitrate - data_bitrate) + < data_bitrate / 256 + ) + + # check sample points + if sample_point: + nom_sample_point_act = ( + 100 + * (1 + bus_params_data.tseg1_abr) + / (1 + bus_params_data.tseg1_abr + bus_params_data.tseg2_abr) + ) + settings_acceptable &= ( + abs(nom_sample_point_act - sample_point) + < 2.0 # 2 percent tolerance + ) + if fd and data_sample_point: + data_sample_point_act = ( + 100 + * (1 + bus_params_data.tseg1_dbr) + / (1 + bus_params_data.tseg1_dbr + bus_params_data.tseg2_dbr) + ) + settings_acceptable &= ( + abs(data_sample_point_act - data_sample_point) + < 2.0 # 2 percent tolerance + ) + + if not settings_acceptable: + # The error message depends on the currently active CAN settings. + # If the active operation mode is CAN FD, show the active CAN FD timings, + # otherwise show CAN 2.0 timings. + if bool( + bus_params_data.can_op_mode + & xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD + ): + active_settings = bus_params.canfd._asdict() + active_settings["can_op_mode"] = "CAN FD" + else: + active_settings = bus_params.can._asdict() + active_settings["can_op_mode"] = "CAN 2.0" + settings_string = ", ".join( + [f"{key}: {val}" for key, val in active_settings.items()] + ) + channel = self.index_to_channel[channel_index] + raise CanInitializationError( + f"The requested settings could not be set for channel {channel}. " + f"Another application might have set incompatible settings. " + f"These are the currently active settings: {settings_string}." + ) def _apply_filters(self, filters: Optional[CanFilters]) -> None: if filters: @@ -1239,3 +1250,10 @@ def _hw_type(hw_type: int) -> Union[int, xldefine.XL_HardwareType]: except ValueError: LOG.warning(f'Unknown XL_HardwareType value "{hw_type}"') return hw_type + + +def _iterate_channel_index(channel_mask: int) -> Iterator[int]: + """Iterate over channel indexes in channel mask.""" + for channel_index, bit in enumerate(reversed(bin(channel_mask)[2:])): + if bit == "1": + yield channel_index diff --git a/test/test_vector.py b/test/test_vector.py index 16a79c6d1..99756c41d 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -134,6 +134,32 @@ def test_bus_creation_channel_index() -> None: bus.shutdown() +@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable") +def test_bus_creation_multiple_channels() -> None: + bus = can.Bus( + channel="0, 1", + bitrate=1_000_000, + serial=_find_virtual_can_serial(), + interface="vector", + ) + assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + assert len(bus.channels) == 2 + assert bus.mask == 3 + + xl_channel_config_0 = _find_xl_channel_config( + serial=_find_virtual_can_serial(), channel=0 + ) + assert xl_channel_config_0.busParams.data.can.bitRate == 1_000_000 + + xl_channel_config_1 = _find_xl_channel_config( + serial=_find_virtual_can_serial(), channel=1 + ) + assert xl_channel_config_1.busParams.data.can.bitRate == 1_000_000 + + bus.shutdown() + + def test_bus_creation_bitrate_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", bitrate=200_000, _testing=True) assert isinstance(bus, canlib.VectorBus) @@ -836,6 +862,12 @@ def test_vector_subtype_error_from_generic() -> None: raise specific +def test_iterate_channel_index() -> None: + channel_mask = 0x23 # 100011 + channels = list(canlib._iterate_channel_index(channel_mask)) + assert channels == [0, 1, 5] + + @pytest.mark.skipif( sys.byteorder != "little", reason="Test relies on little endian data." )