-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathUSB.py
133 lines (99 loc) · 2.83 KB
/
USB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import Communication
import threading
import time
# 0 -> control (SETUP) endpoint
# 1 -> IN endpoint
# 0 -> OUT endpoint
CONTROL = "Control"
BULK = "Bulk"
INTERRUPT = "Interrupt"
ISOCHRONOUS = "Isochronous"
IN0 = 1
IN1 = 11
IN2 = 21
IN3 = 31
IN4 = 41
IN5 = 51
IN6 = 61
IN7 = 71
IN8 = 81
IN9 = 91
IN10 = 101
IN11 = 111
IN12 = 121
IN13 = 131
IN14 = 141
IN15 = 151
OUT0 = 0
OUT1 = 10
OUT2 = 20
OUT3 = 30
OUT4 = 40
OUT5 = 50
OUT6 = 60
OUT7 = 70
OUT8 = 80
OUT9 = 90
OUT10 = 100
OUT11 = 110
OUT12 = 120
OUT13 = 130
OUT14 = 140
OUT15 = 150
IOCTL_INTERRUPT_INTERVAL = 0
IOCTL_INTERRUPT_LENGTH = 0
class USB:
def __init__(self, vid, pid, interface):
self.vid = vid
self.pid = pid
self.interface = interface
self.comm = Communication.communication(vid, pid, interface)
#self.interruptInterval = self.comm.ioctl(vid, pid, interface, 0, 0, 0, 0, 0, 0, IN0, IOCTL_INTERRUPT_INTERVAL)
self.interruptInterval = 500
def writeInterruptHandler(self, OUTn, interval = None, callBackFunction = None):
if interval == None:
interval = self.interruptInterval
thread = threading.Thread(target = self.writeInterruptCaller, args = (INn, interval, callBackFunction,))
thread.start()
return thread
def writeInterruptCaller(self, OUTn, interval, callBackFunction = None):
while True :
data = callBackFunction()
self.writeInterrupt(len(data), data, OUTn)
time.sleep(interval/1000)
def writeInterrupt(self, size, data, OUTn):
self.comm.send(INTERRUPT, OUTn, size, data)
def readInterruptHandler(self, INn, interval = None, callBackFunction = None):
if interval == None:
interval = self.interruptInterval
thread = threading.Thread(target = self.readInterruptCaller, args = (INn, interval, callBackFunction,))
thread.start()
return thread
def readInterruptCaller(self, INn, interval, callBackFunction = None):
while True :
data = self.readInterrupt(INn)
callBackFunction(data)
time.sleep(interval/1000.0)
def readInterrupt(self, INn):
data = self.comm.recive(INTERRUPT, INn)
return data
def writeBulk(self, OUTn, size, data):
self.comm.sendData(BULK, OUTn, size, data)
def readBulk(self, INn):
data = self.comm.recive(BULK, INn)
return data
def writeControl(self, request, requestType, value, index, size, data = None):
self.comm.send(CONTROL, 1, request, requestType, value, index, size, data)
def readControl(self, request, requestType, value, index, size, data):
return self.comm.recive(CONTROL, 0, request, requestType, value, index, size, data)
def callback1(data):
print("2")
def callback2(data):
print("1")
if __name__ == "__main__":
usb = USB(10, 10, 15)
usb.writeBulk(OUT15, 8, 65)
print(usb.readBulk(IN1))
thread1 = usb.readInterruptHandler(IN1, callBackFunction = callback1)
thread2 = usb.readInterruptHandler(IN1, callBackFunction = callback2)
time.sleep(2)