Skip to content

Commit

Permalink
Style: Update files to match flake8 style, with override
Browse files Browse the repository at this point in the history
  • Loading branch information
Domenic Horner authored and Domenic Horner committed Mar 26, 2021
1 parent 928659b commit fded56a
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 86 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
ignore = E501
20 changes: 10 additions & 10 deletions rpc/mux.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import os
import binascii
import time
import struct
import itertools
import pytap2
import selectors


class MuxPacket(object):
def __init__(self, seq=0):
self.seq = seq
Expand All @@ -16,7 +15,8 @@ def __init__(self, seq=0):

def get_packet(self):
# put the final length in
self.packet = self.packet[:8] + struct.pack('<L', len(self.packet)) + self.packet[8+4:]
self.packet = self.packet[:8] + \
struct.pack('<L', len(self.packet)) + self.packet[8 + 4:]
return self.packet

def append_tag(self, tag, data=b'', extra=0):
Expand All @@ -27,15 +27,17 @@ def append_tag(self, tag, data=b'', extra=0):
else:
while len(self.packet) & 3:
self.packet += b'\0'
self.packet = self.packet[:self.fwd_pointer] + struct.pack('<L', len(self.packet)) + self.packet[self.fwd_pointer+4:]
self.packet = self.packet[:self.fwd_pointer] + struct.pack(
'<L', len(self.packet)) + self.packet[self.fwd_pointer + 4:]

hdr_len = len(hdr) + 8
hdr += struct.pack('<HHL', hdr_len+len(data), extra, 0)
hdr += struct.pack('<HHL', hdr_len + len(data), extra, 0)
# that last field is the next-tag pointer
self.fwd_pointer = len(self.packet) + len(hdr) - 4

self.packet += hdr + data


class XMMMux(object):
def package(self, packet_data):
self.seq = (self.seq + 1) & 0xff
Expand All @@ -52,11 +54,10 @@ def __init__(self, path='/dev/xmm0/mux'):

self.seq = 0

raw = binascii.unhexlify('6000000000383AFFFE800000000000000000000000000001FE80000000000000D438C1FD077C00C38600248840005550000000000000000005010000000005DC03044040FFFFFFFFFFFFFFFF0000000020018004142021F50000000000000000')
pak = self.package(raw)
pkd = binascii.unhexlify('414442480000010088000000700000006000000000383AFFFE800000000000000000000000000001FE80000000000000D438C1FD077C00C38600248840005550000000000000000005010000000005DC03044040FFFFFFFFFFFFFFFF0000000020018004142021F50000000000000000414454481800000000000000000000001000000060000000')
p = MuxPacket(seq=1)
p.append_tag(b'ADBH', binascii.unhexlify('6000000000383AFFFE800000000000000000000000000001FE80000000000000D438C1FD077C00C38600248840005550000000000000000005010000000005DC03044040FFFFFFFFFFFFFFFF0000000020018004142021F50000000000000000'))
p.append_tag(b'ADBH', binascii.unhexlify(
'6000000000383AFFFE800000000000000000000000000001FE80000000000000D438C1FD077C00C38600248840005550000000000000000005010000000005DC03044040FFFFFFFFFFFFFFFF0000000020018004142021F50000000000000000'))
p.append_tag(b'ADTH', struct.pack('<LLL', 0, 0x10, 0x60))

print(binascii.hexlify(p.get_packet()))
Expand All @@ -76,7 +77,6 @@ def __init__(self, path='/dev/xmm0/mux'):
p.append_tag(b'CMDH', struct.pack('<LLLL', 1, 0, 0, 0))
os.write(self.fp, p.get_packet())


while True:
events = sel.select()
for key, mask in events:
Expand All @@ -99,7 +99,7 @@ def read_mux(self):
tail = tail[16:]
while len(tail) >= 8:
offset, length = struct.unpack('<LL', tail[:8])
puk = data[offset:offset+length]
puk = data[offset:offset + length]
# print('ai>', binascii.hexlify(puk))
self.tun.write(puk)
tail = tail[8:]
Expand Down
194 changes: 153 additions & 41 deletions rpc/open_xdatachannel.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
#!/usr/bin/env python3

import configargparse
from pyroute2 import IPRoute
from os.path import join, abspath, dirname
import uuid
import socket
import struct
import dbus
import sys
import time

import rpc
import logging
# must do this before importing pyroute2
logging.basicConfig(level=logging.DEBUG)

import rpc
import xm_dbus

import configargparse
import time
import sys
from pyroute2 import IPRoute
from os.path import join, abspath, dirname

parser = configargparse.ArgumentParser(
description='Hacky tool to bring up XMM7x60 modem',
default_config_files=[
join(dirname(abspath(__file__)), '..', 'xmm7360.ini'),
'/etc/xmm7360'
'/etc/xmm7360',
join(dirname(abspath(__file__)), '..', 'xmm7360.ini')
],
)

parser.add_argument('-c', '--conf', is_config_file=True)
parser.add_argument('-a', '--apn', required=True, help="Network provider APN")
parser.add_argument('-n', '--nodefaultroute', action="store_true", help="Don't install modem as default route for IP traffic")
parser.add_argument('-m', '--metric', type=int, default=1000, help="Metric for default route (higher is lower priority)")
parser.add_argument('-t', '--ip-fetch-timeout', type=int, default=1, help="Retry interval in seconds when getting IP config")
parser.add_argument('-r', '--noresolv', action="store_true", help="Don't add modem-provided DNS servers to /etc/resolv.conf")
parser.add_argument('-d', '--dbus', action="store_true", help="Activate Networkmanager Connection via DBUS")
parser.add_argument('-n', '--nodefaultroute', action="store_true",
help="Don't install modem as default route for IP traffic")
parser.add_argument('-m', '--metric', type=int, default=1000,
help="Metric for default route (higher is lower priority)")
parser.add_argument('-t', '--ip-fetch-timeout', type=int, default=1,
help="Retry interval in seconds when getting IP config")
parser.add_argument('-r', '--noresolv', action="store_true",
help="Don't add modem-provided DNS servers to /etc/resolv.conf")
parser.add_argument('-d', '--dbus', action="store_true",
help="Activate Networkmanager Connection via DBUS")

cfg, unknown = parser.parse_known_args()

Expand All @@ -43,13 +51,14 @@
r.execute('UtaMsSimOpenReq')

rpc.do_fcc_unlock(r)

# disable aeroplane mode if had been FCC-locked. first and second args are probably don't-cares
rpc.UtaModeSet(r, 1)

r.execute('UtaMsCallPsAttachApnConfigReq', rpc.pack_UtaMsCallPsAttachApnConfigReq(cfg.apn), is_async=True)
r.execute('UtaMsCallPsAttachApnConfigReq',
rpc.pack_UtaMsCallPsAttachApnConfigReq(cfg.apn), is_async=True)

attach = r.execute('UtaMsNetAttachReq', rpc.pack_UtaMsNetAttachReq(), is_async=True)
attach = r.execute('UtaMsNetAttachReq',
rpc.pack_UtaMsNetAttachReq(), is_async=True)
_, status = rpc.unpack('nn', attach['body'])

if status == 0xffffffff:
Expand All @@ -58,7 +67,8 @@
while not r.attach_allowed:
r.pump()

attach = r.execute('UtaMsNetAttachReq', rpc.pack_UtaMsNetAttachReq(), is_async=True)
attach = r.execute('UtaMsNetAttachReq',
rpc.pack_UtaMsNetAttachReq(), is_async=True)
_, status = rpc.unpack('nn', attach['body'])

if status == 0xffffffff:
Expand All @@ -79,25 +89,19 @@
idx = ipr.link_lookup(ifname='wwan0')[0]

ipr.flush_addr(index=idx)
ipr.link(
'set',
index=idx,
state='up'
)
ipr.addr(
'add',
index=idx,
address=ip_addr
)
ipr.link('set',
index=idx,
state='up')
ipr.addr('add',
index=idx,
address=ip_addr)


# should a default route be added for this connection
if not cfg.nodefaultroute:
ipr.route(
'add',
dst='default',
priority=cfg.metric,
oif=idx
)
ipr.route('add',
dst='default',
priority=cfg.metric,
oif=idx)

# Add DNS values to /etc/resolv.conf
if not cfg.noresolv:
Expand All @@ -107,16 +111,124 @@
resolv.write('nameserver %s\n' % dns)

# this gives us way too much stuff, which we need
pscr = r.execute('UtaMsCallPsConnectReq', rpc.pack_UtaMsCallPsConnectReq(), is_async=True)

pscr = r.execute('UtaMsCallPsConnectReq',
rpc.pack_UtaMsCallPsConnectReq(), is_async=True)
# this gives us a handle we need
dcr = r.execute('UtaRPCPsConnectToDatachannelReq', rpc.pack_UtaRPCPsConnectToDatachannelReq())
dcr = r.execute('UtaRPCPsConnectToDatachannelReq',
rpc.pack_UtaRPCPsConnectToDatachannelReq())

csr_req = pscr['body'][:-6] + dcr['body'] + b'\x02\x04\0\0\0\0'

r.execute('UtaRPCPSConnectSetupReq', csr_req)

if not cfg.dbus:
sys.exit(1)

dbus_mgmt = xm_dbus.DBUS()
dbus_mgmt.setup_network_manager(ip_addr, dns_values)
myconnection = None
system_bus = dbus.SystemBus()
service_name = "org.freedesktop.NetworkManager"
proxy = system_bus.get_object(
service_name, "/org/freedesktop/NetworkManager/Settings")
dproxy = system_bus.get_object(service_name, "/org/freedesktop/NetworkManager")
settings = dbus.Interface(proxy, "org.freedesktop.NetworkManager.Settings")
manager = dbus.Interface(dproxy, "org.freedesktop.NetworkManager")


def dottedQuadToNum(ip):
return struct.unpack('<L', socket.inet_aton(str(ip)))[0]


def get_connections():
global myconnection, connection_path
connection_paths = settings.ListConnections()
for path in connection_paths:
con_proxy = system_bus.get_object(service_name, path)
settings_connection = dbus.Interface(
con_proxy, "org.freedesktop.NetworkManager.Settings.Connection")
config = settings_connection.GetSettings()
s_con = config["connection"]
print("name:%s uuid:%s type:%s" %
(s_con["id"], s_con["uuid"], s_con["type"]))
if s_con["id"] == 'xmm7360':
myconnection = s_con["uuid"]
connection_path = path


get_connections()

if (myconnection is not None):
print("setup %s" % myconnection)
addr = dbus.Dictionary({"address": ip_addr, "prefix": dbus.UInt32(32)})
connection_paths = settings.ListConnections()
for path in connection_paths:
con_proxy = system_bus.get_object(service_name, path)
settings_connection = dbus.Interface(
con_proxy, "org.freedesktop.NetworkManager.Settings.Connection")
config = settings_connection.GetSettings()
if config["connection"]["uuid"] != myconnection:
continue
print("setup connection")
connection_path = path
if "addresses" in config["ipv4"]:
del config["ipv4"]["addresses"]
if "address-data" in config["ipv4"]:
del config["ipv4"]["address-data"]
if "gateway" in config["ipv4"]:
del config["ipv4"]["gateway"]
if "dns" in config["ipv4"]:
del config["ipv4"]["dns"]

addr = dbus.Dictionary(
{"address": ip_addr, "prefix": dbus.UInt32(32)}
)
config["ipv4"]["address-data"] = dbus.Array(
[addr], signature=dbus.Signature("a{sv}")
)

dbus_ip = [dottedQuadToNum(ip) for ip in dns_values['v4']]

config["ipv4"]["gateway"] = ip_addr

config["ipv4"]["dns"] = dbus.Array([dbus.UInt32(ip) for ip in dbus_ip],
signature=dbus.Signature("u")
)
settings_connection.Update(config)
else:
print("adding connection")
n_con = dbus.Dictionary({"type": "generic", "uuid": str(
uuid.uuid4()), "id": "xmm7360", "interface-name": "wwan0"})
addr = dbus.Dictionary(
{"address": ip_addr, "prefix": dbus.UInt32(32)}
)

dbus_ip = [dottedQuadToNum(ip) for ip in dns_values['v4']]
n_ip4 = dbus.Dictionary(
{
"address-data": dbus.Array([addr], signature=dbus.Signature("a{sv}")),
"gateway": ip_addr,
"method": "manual",
"dns": dbus.Array([dbus.UInt32(ip) for ip in dbus_ip], signature=dbus.Signature("u"))
}
)
n_ip6 = dbus.Dictionary({"method": "ignore"})
add_con = dbus.Dictionary(
{"connection": n_con, "ipv4": n_ip4, "ipv6": n_ip6})
settings.AddConnection(add_con)
get_connections()

devices = manager.GetDevices()

for d in devices:
dev_proxy = system_bus.get_object("org.freedesktop.NetworkManager", d)
prop_iface = dbus.Interface(dev_proxy, "org.freedesktop.DBus.Properties")
props = prop_iface.GetAll("org.freedesktop.NetworkManager.Device")
if props["Interface"] == "wwan0":
devpath = d
print("found Interface: %s" % props["Interface"])
print("Managed: %s" % props["Managed"])
if props["Managed"] == 0:
print("activate")
prop_iface.Set("org.freedesktop.NetworkManager.Device",
"Managed", dbus.Boolean(1))

manager.ActivateConnection(connection_path, devpath, "/")
Loading

0 comments on commit fded56a

Please sign in to comment.