-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathposix_communication.py
111 lines (90 loc) · 3.68 KB
/
posix_communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import fcntl
from bitarray import bitarray
import os
import struct
import array
import threading
import time
from helper import Helper
from config import *
class Communication:
def __init__(self, path):
if os.path.exists(path):
self.path = path
else:
raise ValueError("Path does not exist")
def send(self, data):
file = open(self.path, "w")
file.write(data)
file.close()
def receive(self, size=0):
try:
file = open(self.path, "r")
fd = file.fileno()
data = os.read(fd, 16)
file.close()
return data
except Exception as e:
print(e)
return None
def device_control(self, operation, buffer):
file = open(self.path, "r")
fcntl.ioctl(file, operation, buffer, 1)
file.close()
return buffer
class Bulk(Communication):
def __init__(self, vid, pid, interface, endpoint, endpoint_type="Bulk"):
self.path = "/home/sayed/dev/" + vid + "/" + pid + "/" + interface + "/" + endpoint_type + "/" + "{0:03}".format(
endpoint)
Communication.__init__(self, self.path)
buffert = array.array("i", struct.pack("iiiii", 0, 0, 0, 0, 0))
buffer = self.device_control(3, buffert)
self.type = buffer[0]
print(self.type)
self.direction = buffer[1]
print(self.direction)
self.endpoint_address = buffer[2]
print(self.endpoint_address)
self.interval = buffer[3]
print(self.interval)
self.buffer_size = buffer[4]
print(self.buffer_size)
class Interrupt(Bulk):
def __init__(self, vid, pid, interface, endpoint):
Bulk.__init__(self, vid, pid, interface, endpoint, "Interrupt")
def write_interrupt_handler(self, callback_function=None):
thread = threading.Thread(target=self.__write_interrupt_caller, args=(self.path, self.interval, callback_function,))
thread.start()
return thread
def __write_interrupt_caller(self, callback_function=None):
while True:
data = callback_function()
self.send(data)
time.sleep(self.interval / 1000.0)
def read_interrupt_handler(self, callback_function=None):
thread = threading.Thread(target=self.__read_interrupt_caller, args=(callback_function,))
thread.start()
return thread
def __read_interrupt_caller(self, callback_function=None):
while True:
data = self.receive()
callback_function(data)
time.sleep(self.interval / 1000.0)
class Control(Communication):
def __init__(self, vid, pid, interface, endpoint):
self.path = "/home/sayed/dev/" + vid + "/" + pid + "/" + interface + "/" + "Control" + "/" + "{0:03}".format(
endpoint)
Communication.__init__(self, self.path)
def form_request_packet(self, request, request_type, value, index, size):
return bitarray(
"{0:08b}".format(request) + "{0:08b}".format(request_type) + "{0:<016b}".format(value) +
"{0:<016b}".format(index) + "{0:<016b}".format(size)).tobytes()
def send(self, request, request_type, value, index, size, data):
messege = ""
req = self.form_request_packet(request, request_type, value, index, size)
messege = messege + str(req)
messege = messege + data
Communication.send(self, messege)
def receive(self, request, request_type, value, index, size, data):
buffer = array.array("c", struct.pack("BBHHH{width}s".format(width=size), request, request_type, value, index, size, data))
return Communication.device_control(self, IOCTL_CONTROL_READ, buffer)[8:]