diff --git a/smbus2/__init__.py b/smbus2/__init__.py index f52948f..e169a1b 100644 --- a/smbus2/__init__.py +++ b/smbus2/__init__.py @@ -20,7 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from .smbus2 import SMBus, i2c_msg, I2cFunc # noqa: F401 +from .smbus2 import SMBus, SMBusFreeBSD, i2c_msg, I2cFunc # noqa: F401 __version__ = "0.4.3" __all__ = ["SMBus", "i2c_msg", "I2cFunc"] diff --git a/smbus2/smbus2.py b/smbus2/smbus2.py index e9d7477..1beb112 100644 --- a/smbus2/smbus2.py +++ b/smbus2/smbus2.py @@ -24,6 +24,7 @@ import sys from fcntl import ioctl from ctypes import c_uint32, c_uint8, c_uint16, c_char, POINTER, Structure, Array, Union, create_string_buffer, string_at +import platform # Commands from uapi/linux/i2c-dev.h @@ -259,8 +260,16 @@ def create(*i2c_msg_instances): ############################################################# +def get_system(): + return platform.system() + + +def get_architecture(): + return platform.architecture() + class SMBus(object): + system = None def __init__(self, bus=None, force=False): """ @@ -274,22 +283,35 @@ def __init__(self, bus=None, force=False): already using it. :type force: boolean """ + if SMBus.system is None: + SMBus.system = get_system() self.fd = None self.funcs = I2cFunc(0) - if bus is not None: - self.open(bus) + # if bus is not None: + # self.open(bus) self.address = None + self.bus = bus self.force = force self._force_last = None self._pec = 0 def __enter__(self): """Enter handler.""" - return self + if SMBus.system == 'FreeBSD': + self.freebsd = SMBusFreeBSD(self.bus, self.force) + if self.bus is not None: + self.freebsd.open(self.freebsd.bus) + return self.freebsd + else: + if self.bus is not None: + self.open(self.bus) + return self def __exit__(self, exc_type, exc_val, exc_tb): """Exit handler.""" self.close() + if 'freebsd' in vars(self): + self.freebsd.close() def open(self, bus): """ @@ -656,3 +678,52 @@ def i2c_rdwr(self, *i2c_msgs): """ ioctl_data = i2c_rdwr_ioctl_data.create(*i2c_msgs) ioctl(self.fd, I2C_RDWR, ioctl_data) + + +class SMBusFreeBSD(SMBus): + bits = None + + def __init__(self, bus=None, force=False): + SMBus.__init__(self, bus, force) + + # FreeBSD specific intialization stuff here + if SMBusFreeBSD.bits is None: + (SMBusFreeBSD.bits, _) = get_architecture() + self.I2CRDWR = {'64bit': 0x80106906, '32bit': 0x80086906}[SMBusFreeBSD.bits] + + def __enter__(self): + """Enter handler.""" + if self.bus is not None: + self.open(self.bus) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit handler.""" + self.close() + + def open(self, bus): + """ + Open a given i2c bus on FreeBSD + + :param bus: i2c bus number (e.g. 0 or 1) + or an absolute file path (e.g. '/dev/iic-1'). + :type bus: int or str + :raise TypeError: if type(bus) is not in (int, str) + """ + if isinstance(bus, int): + filepath = "/dev/iic-{}".format(bus) + elif isinstance(bus, str): + filepath = bus + else: + raise TypeError("Unexpected type(bus)={}".format(type(bus))) + + self.fd = os.open(filepath, os.O_RDWR) + self.funcs = self._get_funcs() + + def _set_address(self, address, force=None): + return + + def write_byte(self, i2c_addr, value, force=None): + address = i2c_addr << 1 | I2C_M_RD + msg = i2c_msg.write(address, value) + self.i2c_rdwr(msg) diff --git a/tests/test_smbus2.py b/tests/test_smbus2.py index 378664b..cfc1192 100644 --- a/tests/test_smbus2.py +++ b/tests/test_smbus2.py @@ -27,7 +27,7 @@ except ImportError: import mock # noqa: F401 -from smbus2 import SMBus, i2c_msg, I2cFunc +from smbus2 import SMBus, SMBusFreeBSD, i2c_msg, I2cFunc ########################################################################## @@ -66,6 +66,7 @@ def mock_open(*args): def mock_close(*args): + print("Mocking close: %s" % args[0]) assert args[0] == MOCK_FD @@ -102,10 +103,30 @@ def mock_ioctl(fd, command, msg): raise IOError("Mocking SMBus Quick failed") +# Mock platform.system function for Linux testing +def mock_get_system_linux(): + print('Mocking get_system() for Linux') + return 'Linux' + + +# Mock platform.system functions for FreeBSD testing +def mock_get_system_freebsd(): + print('Mocking get_system() for FreeBSD') + return 'FreeBSD' + + +# Mock platform.architecture functions for FreeBSD testing +def mock_get_arch(): + return ('64bit', 'ELF') + + # Override open, close and ioctl with our mock functions open_mock = mock.patch('smbus2.smbus2.os.open', mock_open) close_mock = mock.patch('smbus2.smbus2.os.close', mock_close) ioctl_mock = mock.patch('smbus2.smbus2.ioctl', mock_ioctl) +linux_system_mock = mock.patch('smbus2.smbus2.get_system', mock_get_system_linux) +freebsd_system_mock = mock.patch('smbus2.smbus2.get_system', mock_get_system_freebsd) +arch_mock = mock.patch('smbus2.smbus2.get_architecture', mock_get_arch) ########################################################################## # Common error messages @@ -117,17 +138,22 @@ def setUp(self): open_mock.start() close_mock.start() ioctl_mock.start() + linux_system_mock.start() + arch_mock.start() def tearDown(self): open_mock.stop() close_mock.stop() ioctl_mock.stop() + linux_system_mock.stop() + arch_mock.stop() # Test cases class TestSMBus(SMBusTestCase): def test_func(self): bus = SMBus(1) + bus.open(1) print("\nSupported I2C functionality: %x" % bus.funcs) bus.close() @@ -158,6 +184,7 @@ def test_read(self): res3 = [] bus = SMBus(1) + bus.open(1) # Read bytes for k in range(2): @@ -183,6 +210,7 @@ def test_read(self): def test_quick(self): bus = SMBus(1) + bus.open(1) self.assertRaises(IOError, bus.write_quick, 80) def test_pec(self): @@ -191,6 +219,7 @@ def set_pec(bus, enable=True): # Enabling PEC should fail (no mocked PEC support) bus = SMBus(1) + bus.open(1) self.assertRaises(IOError, set_pec, bus, True) self.assertRaises(IOError, set_pec, bus, 1) self.assertEqual(bus.pec, 0) @@ -211,6 +240,15 @@ def test_func(self): self.assertTrue(bus.funcs & I2cFunc.I2C > 0) self.assertTrue(bus.funcs & I2cFunc.SMBUS_QUICK > 0) + def test_repeated_with(self): + bus = SMBus(1) + with bus: + x = bus.read_i2c_block_data(80, 0, 2) + self.assertEqual(len(x), 2, msg=INCORRECT_LENGTH_MSG) + with bus: + y = bus.read_i2c_block_data(80, 0, 2) + self.assertEqual(x, y, msg="Results differ") + def test_read(self): res = [] res2 = [] @@ -264,3 +302,51 @@ def test_i2c_msg(self): k += 1 self.assertEqual(k, 10, msg='Incorrect length') self.assertEqual(s, 55, msg='Incorrect sum') + + +# FreeBSD test cases +class SMBusFreeBSDTestCase(unittest.TestCase): + def setUp(self): + open_mock.start() + close_mock.start() + ioctl_mock.start() + freebsd_system_mock.start() + arch_mock.start() + SMBus.system = None # Reset OS detection + + def tearDown(self): + open_mock.stop() + close_mock.stop() + ioctl_mock.stop() + freebsd_system_mock.stop() + arch_mock.stop() + + +class TestSMBusFreeBSD(SMBusFreeBSDTestCase): + def test_freebsd_explicit(self): + bus = SMBusFreeBSD(1) + self.assertEqual(type(bus).__name__, 'SMBusFreeBSD') + + def test_freebsd_with(self): + with SMBusFreeBSD(1) as bus: + self.assertEqual(type(bus).__name__, 'SMBusFreeBSD') + + def test_freebsd_detected(self): + with SMBus(1) as bus: + self.assertTrue(bus.funcs & I2cFunc.I2C > 0) + self.assertTrue(bus.funcs & I2cFunc.SMBUS_QUICK > 0) + self.assertEqual(type(bus).__name__, 'SMBusFreeBSD') + + def test_freebsd_enter_exit(self): + for id in (1, '/dev/i2c-alias'): + with SMBus(id) as bus: + self.assertEqual(type(bus).__name__, 'SMBusFreeBSD') + self.assertIsNotNone(bus.fd) + self.assertIsNone(bus.fd, None) + + with SMBus() as bus: + self.assertEqual(type(bus).__name__, 'SMBusFreeBSD') + self.assertIsNone(bus.fd) + bus.open(2) + self.assertIsNotNone(bus.fd) + self.assertIsNone(bus.fd)