-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCommunication.py
93 lines (76 loc) · 2.81 KB
/
Communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import fcntl
from bitarray import bitarray
import os
import struct
import array
OUT = 0
IN = 1
class communication:
def __init__(self, vid, pid, interface):
self.vid = vid
self.pid = pid
self.interface = interface
self.root = "/home/samir/Documents/dev/uframe" + "/" + self.vid + "/" + self.pid + "/" + self.interface + "/"
self.interruptWordLength = 0
def checkForEndPointNodes(self, endPoint):
for OUTn in range(0, 150, 10):
if os.path.exists(self.getPath(endPoint, OUTn)) == False:
break
print( "OUT" + str( (int)(OUTn/10) ) )
for INn in range(1, 151, 10):
if os.path.exists(self.getPath(endPoint, INn)) == False:
break
print( "IN" + str( (int)(OUTn/10) ) )
def availableEndpoints(self, endPoint):
if endPoint == 0:
print("control :")
self.checkForEndPointNodes(endPoint)
elif endPoint == 1:
print("bulk :")
self.checkForEndPointNodes(endPoint)
elif endPoint == 2:
print("interrupt :")
self.checkForEndPointNodes(endPoint)
elif endPoint == 3:
print("isochronous:")
self.checkForEndPointNodes(endPoint)
def recive(self, endPoint, INn, request = None, requestType = None, value = None, index = None, size = None, data = None, operation = 1):
fd = self.getPath(endPoint, INn)
print(fd)
if request == None:
if os.path.exists(fd):
file = open(fd, "r")
data = file.read(self.interruptWordLength)
file.close()
return data
else:
print("no such endpoint ,printing available nodes :")
self.availableEndpoints(endPoint)
else :
file = open(fd, "r+")
byteBuffer = bytearray(self.formRequestPacket(request, requestType, value, index, size))
for d in data :
byteBuffer.append(d)
tempBuffer = array.array("B", byteBuffer)
buffer = array.array("c", tempBuffer.tostring())
fcntl.ioctl(file, operation, buffer, 1)
file.close()
return buffer[8:].tostring()
def send(self, endPoint, OUTn, request = None, requestType = None, value = None, index = None, size = None, data = None):
fd = self.getPath(endPoint, OUTn)
messege = ""
if os.path.exists(fd):
if request != None:
messege = messege + self.formRequestPacket(request, requestType, value, index, size)
if data != None:
messege = messege + data
file = open(fd, "w")
file.write(messege)
file.close()
else:
print("no such endpoint ,printing available nodes :")
self.availableEndpoints(endPoint)
def getPath(self, endPoint, IOn):
return self.root + endPoint + "/" + "{0:03}".format(IOn)
def formRequestPacket(self, request, requestType, value, index, size):
return bitarray("{0:08b}".format(request) + "{0:08b}".format(requestType) + "{0:<016b}".format(value) + "{0:<016b}".format(index) + "{0:<016b}".format(size)).tobytes()