-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspider.py
executable file
·255 lines (199 loc) · 9.24 KB
/
spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/python3
##!env python3
# Installation Notes:
# ------------------
#
# The speakers are: Adafruit Speaker Bonnet for Raspberry Pi, see:
# https://learn.adafruit.com/adafruit-speaker-bonnet-for-raspberry-pi
#
# Driver load instructions are at:
# https://learn.adafruit.com/adafruit-speaker-bonnet-for-raspberry-pi/raspberry-pi-usage
#
# Sound player: mpg123
# sudo apt install mpg123
import asyncio
import datetime
import os
import RPi.GPIO as GPIO
import signal
from sys import exit
# Local imports
from globals import My_globals
from sound_board import Sound_board
# Set RPi ports
pir_pin = 12
led_RED = 13
led_GRN = 6
led_BLU = 26
wait_change = 0.1 # time to wait for pir to change before looking again.
ospid_file = "/home/pi/python/spider/ospid.txt"
#---------------------------------------------------------------
# functions that do things - non async
def eyes_setup(pin_no):
GPIO.setup(pin_no, GPIO.OUT) # activate output
pwm_eyes = GPIO.PWM(pin_no, 100) # duty cycle: 100Hz; max value allowed.
pwm_eyes.start(0)
return pwm_eyes
def make_eye_intensities(max_int):
global eyes_intensity
eyes_steps = 40
eyes_intensity = {int(((10**(r/eyes_steps)-1)*max_int/9)+0.99) for r in range(0,eyes_steps+1)}
# exponential series {0..max_intensity} - created as a set to remove duplicates
eyes_intensity = sorted(list(eyes_intensity)) # convert set to list to ensure sequencing.
# sort: since the set returns in undetermined order.
print(eyes_intensity)
return eyes_intensity
# Arm signal to end program.
def handler(signum, frame):
''' A sigint interrupt will get caught here, and will in effect be the same as
getting a ^C at the keyboard.'''
print("\nterm signal received")
raise KeyboardInterrupt
#---------------------------------------------------------------
# async functions
async def eyes_change(pin_no, UP=True):
eye_change_transition = 5 # seconds
delay = eye_change_transition / len(eyes_intensity) # seconds
direction = 1 if UP else -1
for eyes_level in eyes_intensity[::direction]:
pin_no.ChangeDutyCycle(eyes_level)
await asyncio.sleep(delay)
async def flash_GB():
''' Flash Green/Blue LEDs in the eyes, and the white string of LEDs.'''
slow_flash = 5 # seconds between flashes during quiet period
fast_flash = 0.5 # seconds between flashes when animation_active is True
print("flash_GB co-routine")
seconds = slow_flash # Force flash on first entry
while my_globals.spider_parms["END_REQUEST"] == False:
if my_globals.animation_active or seconds >= slow_flash: # Flash every (slow_flash) seconds
GPIO.output(led_GRN, GPIO.HIGH)
GPIO.output(led_BLU, GPIO.HIGH)
await asyncio.sleep(0.1)
GPIO.output(led_GRN, GPIO.LOW)
GPIO.output(led_BLU, GPIO.LOW)
seconds = 0
else:
seconds += fast_flash
await asyncio.sleep(fast_flash)
print("flash_GB shutting down")
#---------------------------------------------------------------
async def track_myglobals():
print("track_myglobals co-routine")
# reload my_globals every 0.5 seconds
while True:
my_globals.get_spider_parms() # since they can change by key_parms
if my_globals.spider_parms["END_REQUEST"]:
break
await asyncio.sleep(0.5)
print("track_myglobals shutting down")
#---------------------------------------------------------------
async def track_pir():
def tick_str(ticks):
ticks = f"{ticks}"
return ticks + len(ticks) * "\b" # add equiv # of backspaces to back space over the number.
wait_on = 7 # wait this long (seconds) to see it's a real mamalian critter or zombie (not guaranteed)
max_ticks = int(wait_on * 1 / wait_change) # ticks in (wait_on) seconds.
print("track_pir co-routine")
# wait until PIR drops during initialization - may/may not be high
while GPIO.input(pir_pin):
await asyncio.sleep(wait_change)
# Generate max _eye_intensity list here just once.
max_eye_intensity = my_globals.spider_parms["MAX_INT"] # get MAX-INT so we can check for future changes
while True: # main loop - only exited with a "return" statement.
while my_globals.spider_parms["END_REQUEST"] == False:
# Wait for pir line to go high
if GPIO.input(pir_pin) == False:
await asyncio.sleep(wait_change) # still quiet
continue
# The PIR line is now high: check for a false positive
print("/ \b", end="", flush=True)
# Check spider parms
if my_globals.spider_parms["MAX_INT"] != max_eye_intensity:
max_eye_intensity = my_globals.spider_parms["MAX_INT"]
eyes_intensity = make_eye_intensities(max_eye_intensity)
my_globals.animation_active = True # Enable sound and start quick flashing
to_ticks = 0 # timeout_ticks
while to_ticks < max_ticks: # now let's wait to see if it drops within wait_on seconds.
if my_globals.spider_parms["END_REQUEST"]:
print("track_pir shutting down")
return
await asyncio.sleep(wait_change)
if GPIO.input(pir_pin):
to_ticks +=1 # pir line still high, increment ticks.
else: # line falls within short period... it's a false positive
break # print report and and return to while True loop.
else: # line is high, and we have exceeded max_ticks - there is a critter out there.
break # out of while True loop
# pir line is down, but we are less than the ticks timeout period - hence don't activate the spider.
report = "\\" + tick_str(to_ticks)
print(report, end="", flush=True)
my_globals.animation_active = False # disable quick flashing
else:
print("track_pir shutting down")
return
curr_time = datetime.datetime.now().strftime('%H:%M:%S')
print("\n>Rising edge detected on port", str(pir_pin) + ",", curr_time)
if my_globals.spider_parms["SOUND_ON"]:
# It is expected that the sound task will complete before the next time it is invoked.
t_sound = asyncio.create_task(sound_board.play_sound(my_globals))
await eyes_change(pwm_RED, UP=True) # slow operation
# Now wait for the pir pin to drop
while my_globals.spider_parms["END_REQUEST"] == False:
if GPIO.input(pir_pin):
await asyncio.sleep(wait_change) # still high, continue animation
else:
break # pir dropped. Shutdown animation.
else:
print("track_pir shutting down")
return
curr_time = datetime.datetime.now().strftime('%H:%M:%S')
print(">Falling edge detected on port", str(pir_pin) + ",", curr_time)
await eyes_change(pwm_RED, UP=False) # slow operation
my_globals.animation_active = False # Stop quick flashing
#---------------------------------------------------------------------------------
async def main():
spider_coros = [flash_GB(), track_pir(), track_myglobals()]
results = await asyncio.gather(*spider_coros)
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
# START HERE (Really!)
print() # cosmetic: new line for when started from run_spider.sh
# See if we're already running.
if os.path.isfile(ospid_file):
print("spider.py is already running.\nIf this message in error,",
"remove the file:", ospid_file)
exit()
# Set up globals and get spider_parms
my_globals = My_globals()
my_globals.animation_active = False # (also) initialized in My_globals.__init__
# Arm sigint to cause proceed to graceful end.
signal.signal(signal.SIGINT, handler)
# Initialize the Pi
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False) # Turn off warning if we didn't a GPIO cleanup
#Generate eye intensities
eyes_intensity = make_eye_intensities(my_globals.spider_parms["MAX_INT"])
pwm_RED = eyes_setup(led_RED) # activate pwm control
GPIO.setup(led_GRN, GPIO.OUT) # activate output
GPIO.setup(led_BLU, GPIO.OUT) # activate output
# Init PIR Control
GPIO.setup(pir_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # activate input
# Initialize sound board object
sound_board = Sound_board()
# Set a beacon to show we're running
# We store the process id here - used in 2 places:
# 1. We check in spider to see if we're already running. (code above)
# 2. Used in the kill_spider.sh script to send a SIGINT to the program.
with open(ospid_file, "w") as f:
f.write(str(os.getpid()))
#-----------------------------
#set up coroutines/tasks and start.
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nKeyboardInterrupt received")
#-----------------------------
# Cleanup our stuff
GPIO.cleanup() # clean up GPIO
if os.path.isfile(ospid_file):
os.remove(ospid_file)