forked from switchdoclabs/SDL_Pi_SkyWeather2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestWirelessSensors.py
75 lines (59 loc) · 3.27 KB
/
testWirelessSensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# test for WeatherSense SwitchDoc Labs Weather Sensors
# ---------------------------------------------------------------------------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------------------------------------------------------------------
import sys
from subprocess import PIPE, Popen, STDOUT
from threading import Thread
#import json
import datetime
# ---------------------------------------------------------------------------------------------------------------------------------------------------------------
# 146 = FT-020T WeatherRack2, #147 = F016TH SDL Temperature/Humidity Sensor
print("Starting Wireless Read")
#cmd = [ '/usr/local/bin/rtl_433', '-q', '-F', 'json', '-R', '146', '-R', '147']
cmd = ['/usr/local/bin/rtl_433', '-q', '-M', 'level', '-F', 'json', '-R', '146', '-R', '147', '-R', '148', '-R', '150', '-R', '151', '-R', '152', '-R', '153']
# ---------------------------------------------------------------------------------------------------------------------------------------------------------------
# A few helper functions...
def nowStr():
return( datetime.datetime.now().strftime( '%Y-%m-%d %H:%M:%S'))
# ---------------------------------------------------------------------------------------------------------------------------------------------------------------
#stripped = lambda s: "".join(i for i in s if 31 < ord(i) < 127)
# We're using a queue to capture output as it occurs
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(src, out, queue):
for line in iter(out.readline, b''):
queue.put(( src, line))
out.close()
# Create our sub-process...
# Note that we need to either ignore output from STDERR or merge it with STDOUT due to a limitation/bug somewhere under the covers of "subprocess"
# > this took awhile to figure out a reliable approach for handling it...
p = Popen( cmd, stdout=PIPE, stderr=STDOUT, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=('stdout', p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ---------------------------------------------------------------------------------------------------------------------------------------------------------------
pulse = 0
while True:
# Other processing can occur here as needed...
#sys.stdout.write('Made it to processing step. \n')
try:
src, line = q.get(timeout = 1)
#print(line.decode())
except Empty:
pulse += 1
else: # got line
pulse -= 1
sLine = line.decode()
print(sLine)
# See if the data is something we need to act on...
if (( sLine.find('F007TH') != -1) or ( sLine.find('F016TH') != -1)):
sys.stdout.write('WeatherSense Indoor T/H F016TH Found' + '\n')
#sys.stdout.write('This is the raw data: ' + sLine + '\n')
if (( sLine.find('FT0300') != -1) or ( sLine.find('FT020T') != -1)):
sys.stdout.write('WeatherSense WeatherRack2 FT020T found' + '\n')
#sys.stdout.write('This is the raw data: ' + sLine + '\n')
sys.stdout.flush()