-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGPIOChannel.py
151 lines (115 loc) · 4.01 KB
/
GPIOChannel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""This module provides object abstraction for buffered Raspberry Pi GPIO channel"""
from typing import AsyncContextManager
import RPi.GPIO as GPIO
import time
from contextlib import AbstractContextManager, suppress
from threading import Thread
from NonBlockQueue import KeepNewQueue, Empty
from tsync import RWLock
__all__ = ['GPIOInput', 'AlreadyStarted', 'AlreadyStopped', 'GPIOManager']
class AlreadyStarted(Exception):
pass
class AlreadyStopped(Exception):
pass
class GPIOInput:
"""Object abstraction for buffered Raspberry Pi GPIO input channel"""
def __init__(self, channel, edge, buffer_size, pull_up_down=GPIO.PUD_OFF, bouncetime=0):
self._channel = channel
self._edge = edge
self._buffer_size = buffer_size
self._pull_up_down = pull_up_down
self._bouncetime = bouncetime
self._buffer = KeepNewQueue(buffer_size)
self._buffer_put_lock = RWLock()
self._buffer_get_lock = RWLock()
self._started = False
@property
def channel(self):
return self._channel
@property
def edge(self):
return self._edge
@edge.setter
def edge(self, edge):
if self._edge != edge:
self._edge = edge
if self._started:
self.restart()
@property
def buffer_size(self):
return self._buffer_size
@buffer_size.setter
def buffer_size(self, buffer_size):
if self._buffer_size != buffer_size:
self._buffer_size = buffer_size
buffer = KeepNewQueue(buffer_size)
Thread(target=self._change_buffer, args=(buffer,)).start()
@property
def pull_up_down(self):
return self._pull_up_down
@pull_up_down.setter
def pull_up_down(self, pull_up_down):
if self._pull_up_down != pull_up_down:
self._pull_up_down = pull_up_down
if self._started:
self.restart()
@property
def bouncetime(self):
return self._bouncetime
@bouncetime.setter
def bouncetime(self, bouncetime):
if self._bouncetime != bouncetime:
self._bouncetime = bouncetime
if self._started:
self.restart()
def start(self, edge=None, pull_up_down=None, bouncetime=None):
if self._started:
raise AlreadyStarted
if edge is not None:
self._edge = edge
if pull_up_down is not None:
self._pull_up_down = pull_up_down
if bouncetime is not None:
self._bouncetime = bouncetime
GPIO.setup(self._channel, GPIO.IN, pull_up_down=self._pull_up_down)
GPIO.add_event_detect(
self._channel, self._edge, callback=self._event_callback, bouncetime=self._bouncetime)
self.started = True
def stop(self):
if not self.started:
raise AlreadyStopped
GPIO.cleanup(self._channel)
self.started = False
def restart(self, *args, **kwargs):
self.stop()
self.start(*args, **kwargs)
def get(self):
if not self._buffer_get_lock.rlock.acquire(blocking=False):
raise Empty
try:
return self._buffer.get()
finally:
self._buffer_get_lock.rlock.release()
def _event_callback(self, channel):
event_time = time.time()
with self._buffer_put_lock.rlock:
self._buffer.put(event_time)
def _change_buffer(self, buffer: KeepNewQueue):
with self._buffer_put_lock.wlock:
with self._buffer_get_lock.wlock:
old_buffer = self._buffer
self._buffer = buffer
with suppress(Empty):
while True:
self._buffer.put(old_buffer.get())
class GPIOManager(AbstractContextManager):
def __init__(self, mode):
self._mode = mode
@property
def mode(self):
return self._mode
def __enter__(self):
GPIO.setmode(self._mode)
return self
def __exit__(self, exc_type, exc_value, traceback):
GPIO.clenup()