Skip to content

Commit

Permalink
RAConnect: use serial backend instead of pyusb
Browse files Browse the repository at this point in the history
  • Loading branch information
robinkrens committed Feb 18, 2024
1 parent 283880d commit 313bddb
Showing 1 changed file with 29 additions and 37 deletions.
66 changes: 29 additions & 37 deletions raflash/RAConnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import sys
import time
import serial
import usb.core
import usb.util
import serial.tools.list_ports
from raflash.RAPacker import *

MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
Expand All @@ -28,62 +29,53 @@ class RAConnect:
def __init__(self, vendor_id, product_id):
self.vendor_id = vendor_id
self.product_id = product_id
self.ep_in = 0x81
self.ep_out = 0x02
self.max_tries = 20
self.timeout_ms = 100
self.dev = None
self.rx_ep = None
self.tx_ep = None
self.chip_layout = []
self.sel_area = 0 # default to Area 0

self.find_device()
status_conn = self.inquire_connection()
if not status_conn:
self.confirm_connection()

def find_port(self):
ports = serial.tools.list_ports.comports()
for p in ports:
if p.vid == self.vendor_id:
return p.device

def find_device(self):
self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
if self.dev is None:
raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
return None

for config in self.dev:
intf = config[(1, 0)]
product_name = usb.util.get_string(self.dev, self.dev.iProduct)
print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
print("Found kernel driver, detaching ... ")
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
for ep in intf:
if (ep.bmAttributes == 0x02):
if ep.bEndpointAddress == self.ep_in:
self.rx_ep = ep
elif ep.bEndpointAddress == self.ep_out:
self.tx_ep = ep
return True

raise ValueError("Device does not have a CDC interface")
def find_device(self):
port = self.find_port()
try:
self.dev = serial.Serial(port, 9600)
except Exception as err:
raise Exception(f'Failed to open port {err}')

def inquire_connection(self):
packed = pack_pkt(INQ_CMD, "")
self.send_data(packed)
info = self.recv_data(7)
info = self.recv_data(1)
if info == bytearray(b'\x00') or info == bytearray(b''):
return False
else:
info += self.recv_data(6)
msg = unpack_pkt(info)
# print("Connection already established")
return True

def confirm_connection(self):
for i in range(self.max_tries):
try:
self.tx_ep.write(bytes([0x55]), self.timeout_ms)
ret = self.rx_ep.read(1, self.timeout_ms)
self.dev.write(bytes([0x55]))
ret = self.dev.read(1)
if ret[0] == 0xC3:
print("Reply received (0xC3)")
return True
except usb.core.USBError as e:
except Exception as e:
print(f"Timeout: retry #{i}", e)
return False

Expand All @@ -96,29 +88,29 @@ def set_chip_layout(self, cfg):
self.chip_layout = cfg

def send_data(self, packed_data):
if self.tx_ep is None:
if self.dev is None:
return False
try:
self.tx_ep.write(packed_data, self.timeout_ms)
except usb.core.USBError as e:
print("Timeout: error", e)
self.dev.write(packed_data)
except Exception as err:
print("Timeout: error", err)
return False
return True

def recv_data(self, exp_len, timeout=100):
msg = bytearray(b'')
if exp_len > MAX_TRANSFER_SIZE:
raise ValueError(f"length package {exp_len} over max transfer size")
if self.rx_ep is None:
if self.dev is None:
return False
try:
received = 0
while received != exp_len:
buf = self.rx_ep.read(exp_len, timeout)
buf = self.dev.read(exp_len)
msg += buf
received += len(buf)
if received == exp_len:
return msg
except usb.core.USBError as e:
print("Timeout: error", e)
except Exception as err:
print("Timeout: error", err)
return msg

0 comments on commit 313bddb

Please sign in to comment.