Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structure for FreeBSD preserving backwards compatibility #100

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion smbus2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from .smbus2 import SMBus, i2c_msg, I2cFunc # noqa: F401
from .smbus2 import SMBus, SMBusFreeBSD, i2c_msg, I2cFunc # noqa: F401

__version__ = "0.4.3"
__all__ = ["SMBus", "i2c_msg", "I2cFunc"]
77 changes: 74 additions & 3 deletions smbus2/smbus2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sys
from fcntl import ioctl
from ctypes import c_uint32, c_uint8, c_uint16, c_char, POINTER, Structure, Array, Union, create_string_buffer, string_at
import platform


# Commands from uapi/linux/i2c-dev.h
Expand Down Expand Up @@ -259,8 +260,16 @@ def create(*i2c_msg_instances):

#############################################################

def get_system():
return platform.system()


def get_architecture():
return platform.architecture()


class SMBus(object):
system = None

def __init__(self, bus=None, force=False):
"""
Expand All @@ -274,22 +283,35 @@ def __init__(self, bus=None, force=False):
already using it.
:type force: boolean
"""
if SMBus.system is None:
SMBus.system = get_system()
self.fd = None
self.funcs = I2cFunc(0)
if bus is not None:
self.open(bus)
# if bus is not None:
# self.open(bus)
self.address = None
self.bus = bus
self.force = force
self._force_last = None
self._pec = 0

def __enter__(self):
"""Enter handler."""
return self
if SMBus.system == 'FreeBSD':
self.freebsd = SMBusFreeBSD(self.bus, self.force)
if self.bus is not None:
self.freebsd.open(self.freebsd.bus)
return self.freebsd
else:
if self.bus is not None:
self.open(self.bus)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit handler."""
self.close()
if 'freebsd' in vars(self):
self.freebsd.close()

def open(self, bus):
"""
Expand Down Expand Up @@ -656,3 +678,52 @@ def i2c_rdwr(self, *i2c_msgs):
"""
ioctl_data = i2c_rdwr_ioctl_data.create(*i2c_msgs)
ioctl(self.fd, I2C_RDWR, ioctl_data)


class SMBusFreeBSD(SMBus):
bits = None

def __init__(self, bus=None, force=False):
SMBus.__init__(self, bus, force)

# FreeBSD specific intialization stuff here
if SMBusFreeBSD.bits is None:
(SMBusFreeBSD.bits, _) = get_architecture()
self.I2CRDWR = {'64bit': 0x80106906, '32bit': 0x80086906}[SMBusFreeBSD.bits]

def __enter__(self):
"""Enter handler."""
if self.bus is not None:
self.open(self.bus)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit handler."""
self.close()

def open(self, bus):
"""
Open a given i2c bus on FreeBSD

:param bus: i2c bus number (e.g. 0 or 1)
or an absolute file path (e.g. '/dev/iic-1').
:type bus: int or str
:raise TypeError: if type(bus) is not in (int, str)
"""
if isinstance(bus, int):
filepath = "/dev/iic-{}".format(bus)
elif isinstance(bus, str):
filepath = bus
else:
raise TypeError("Unexpected type(bus)={}".format(type(bus)))

self.fd = os.open(filepath, os.O_RDWR)
self.funcs = self._get_funcs()

def _set_address(self, address, force=None):
return

def write_byte(self, i2c_addr, value, force=None):
address = i2c_addr << 1 | I2C_M_RD
msg = i2c_msg.write(address, value)
self.i2c_rdwr(msg)
88 changes: 87 additions & 1 deletion tests/test_smbus2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
except ImportError:
import mock # noqa: F401

from smbus2 import SMBus, i2c_msg, I2cFunc
from smbus2 import SMBus, SMBusFreeBSD, i2c_msg, I2cFunc


##########################################################################
Expand Down Expand Up @@ -66,6 +66,7 @@ def mock_open(*args):


def mock_close(*args):
print("Mocking close: %s" % args[0])
assert args[0] == MOCK_FD


Expand Down Expand Up @@ -102,10 +103,30 @@ def mock_ioctl(fd, command, msg):
raise IOError("Mocking SMBus Quick failed")


# Mock platform.system function for Linux testing
def mock_get_system_linux():
print('Mocking get_system() for Linux')
return 'Linux'


# Mock platform.system functions for FreeBSD testing
def mock_get_system_freebsd():
print('Mocking get_system() for FreeBSD')
return 'FreeBSD'


# Mock platform.architecture functions for FreeBSD testing
def mock_get_arch():
return ('64bit', 'ELF')


# Override open, close and ioctl with our mock functions
open_mock = mock.patch('smbus2.smbus2.os.open', mock_open)
close_mock = mock.patch('smbus2.smbus2.os.close', mock_close)
ioctl_mock = mock.patch('smbus2.smbus2.ioctl', mock_ioctl)
linux_system_mock = mock.patch('smbus2.smbus2.get_system', mock_get_system_linux)
freebsd_system_mock = mock.patch('smbus2.smbus2.get_system', mock_get_system_freebsd)
arch_mock = mock.patch('smbus2.smbus2.get_architecture', mock_get_arch)
##########################################################################

# Common error messages
Expand All @@ -117,17 +138,22 @@ def setUp(self):
open_mock.start()
close_mock.start()
ioctl_mock.start()
linux_system_mock.start()
arch_mock.start()

def tearDown(self):
open_mock.stop()
close_mock.stop()
ioctl_mock.stop()
linux_system_mock.stop()
arch_mock.stop()


# Test cases
class TestSMBus(SMBusTestCase):
def test_func(self):
bus = SMBus(1)
bus.open(1)
print("\nSupported I2C functionality: %x" % bus.funcs)
bus.close()

Expand Down Expand Up @@ -158,6 +184,7 @@ def test_read(self):
res3 = []

bus = SMBus(1)
bus.open(1)

# Read bytes
for k in range(2):
Expand All @@ -183,6 +210,7 @@ def test_read(self):

def test_quick(self):
bus = SMBus(1)
bus.open(1)
self.assertRaises(IOError, bus.write_quick, 80)

def test_pec(self):
Expand All @@ -191,6 +219,7 @@ def set_pec(bus, enable=True):

# Enabling PEC should fail (no mocked PEC support)
bus = SMBus(1)
bus.open(1)
self.assertRaises(IOError, set_pec, bus, True)
self.assertRaises(IOError, set_pec, bus, 1)
self.assertEqual(bus.pec, 0)
Expand All @@ -211,6 +240,15 @@ def test_func(self):
self.assertTrue(bus.funcs & I2cFunc.I2C > 0)
self.assertTrue(bus.funcs & I2cFunc.SMBUS_QUICK > 0)

def test_repeated_with(self):
bus = SMBus(1)
with bus:
x = bus.read_i2c_block_data(80, 0, 2)
self.assertEqual(len(x), 2, msg=INCORRECT_LENGTH_MSG)
with bus:
y = bus.read_i2c_block_data(80, 0, 2)
self.assertEqual(x, y, msg="Results differ")

def test_read(self):
res = []
res2 = []
Expand Down Expand Up @@ -264,3 +302,51 @@ def test_i2c_msg(self):
k += 1
self.assertEqual(k, 10, msg='Incorrect length')
self.assertEqual(s, 55, msg='Incorrect sum')


# FreeBSD test cases
class SMBusFreeBSDTestCase(unittest.TestCase):
def setUp(self):
open_mock.start()
close_mock.start()
ioctl_mock.start()
freebsd_system_mock.start()
arch_mock.start()
SMBus.system = None # Reset OS detection

def tearDown(self):
open_mock.stop()
close_mock.stop()
ioctl_mock.stop()
freebsd_system_mock.stop()
arch_mock.stop()


class TestSMBusFreeBSD(SMBusFreeBSDTestCase):
def test_freebsd_explicit(self):
bus = SMBusFreeBSD(1)
self.assertEqual(type(bus).__name__, 'SMBusFreeBSD')

def test_freebsd_with(self):
with SMBusFreeBSD(1) as bus:
self.assertEqual(type(bus).__name__, 'SMBusFreeBSD')

def test_freebsd_detected(self):
with SMBus(1) as bus:
self.assertTrue(bus.funcs & I2cFunc.I2C > 0)
self.assertTrue(bus.funcs & I2cFunc.SMBUS_QUICK > 0)
self.assertEqual(type(bus).__name__, 'SMBusFreeBSD')

def test_freebsd_enter_exit(self):
for id in (1, '/dev/i2c-alias'):
with SMBus(id) as bus:
self.assertEqual(type(bus).__name__, 'SMBusFreeBSD')
self.assertIsNotNone(bus.fd)
self.assertIsNone(bus.fd, None)

with SMBus() as bus:
self.assertEqual(type(bus).__name__, 'SMBusFreeBSD')
self.assertIsNone(bus.fd)
bus.open(2)
self.assertIsNotNone(bus.fd)
self.assertIsNone(bus.fd)