-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobserve.py
153 lines (135 loc) ยท 5.38 KB
/
observe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import argparse
import base64
import datetime
import json
import random
import subprocess
import sys
import threading
import time
import requests
parser = argparse.ArgumentParser()
parser.add_argument("-name", help="name of the signal group", type=str)
args = parser.parse_args()
if args.name is None:
print("name is required")
sys.exit(1)
url = 'https://tld.iot.hamburg.de/v1.1/Things?%24Filter=name%20eq%20%27' + args.name + '%27&$Expand=Datastreams';
response = requests.get(url)
if response.status_code != 200:
print("Error: " + str(response.status_code))
sys.exit(1)
datastreams = {}
for datastream in response.json()['value'][0]['Datastreams']:
datastreams[f'v1.1/Datastreams({datastream["@iot.id"]})/Observations'] = datastream['properties']['layerName']
sub_udp = f"mosquitto_sub -h tld.iot.hamburg.de -p 1883 -v "
sub_udp += f"-i {str(random.randint(0, 1000000))} "
sub_udp = sub_udp + " ".join([f"-t '{topic}'" for topic in datastreams.keys()])
last_primary_signal_observation = None
last_car_detector_observation = None
last_bike_detector_observation = None
def udp_thread():
sys.stdout.write(sub_udp + "\n")
global last_primary_signal_observation
global last_car_detector_observation
global last_bike_detector_observation
process = subprocess.Popen(sub_udp, stdout=subprocess.PIPE, shell=True)
while True:
line = process.stdout.readline()
if not line:
break
decoded = line.decode('utf-8').split(" ")
if datastreams[decoded[0]] == "primary_signal":
last_primary_signal_observation = json.loads(decoded[1])
if datastreams[decoded[0]] == "cycle_second":
sys.stdout.write(" ๐ New cycle started")
if datastreams[decoded[0]] == "detector_car":
last_car_detector_observation = json.loads(decoded[1])
if datastreams[decoded[0]] == "detector_bike":
last_car_detector_observation = json.loads(decoded[1])
# Subscribe in parallel to the predictions
sub_predictions = f"mosquitto_sub -h localhost -p 1883 -v "
sub_predictions += f"-i {str(random.randint(0, 1000000))} "
sub_predictions += f"-t 'hamburg/{args.name}' "
last_prediction = None
def predictions_thread():
sys.stdout.write(sub_predictions + "\n")
global last_prediction
process = subprocess.Popen(sub_predictions, stdout=subprocess.PIPE, shell=True)
while True:
line = process.stdout.readline()
if not line:
break
decoded = line.decode('utf-8').split(" ")
last_prediction = json.loads(decoded[1])
sys.stdout.write(" ๐ฎ New prediction")
def int_to_color(result):
if result == 0:
return "โซ๏ธ"
if result == 1:
return "๐ด"
if result == 2:
return "๐ก"
if result == 3:
return "๐ข"
if result == 4:
return "๐ " # Red amber
if result == 5:
return "๐" # Yellow flashing
if result == 6:
return "โณ๏ธ" # Green flashing
return "{result}"
# Print the state every second
def print_thread():
global last_primary_signal_observation
global last_car_detector_observation
global last_bike_detector_observation
global last_prediction
global datastreams
while True:
# Format the actual signal state
actual_str = "โซ๏ธ"
if last_primary_signal_observation:
result = last_primary_signal_observation["result"]
actual_str = f"{int_to_color(result)}"
if "detector_car" in datastreams.values():
if last_car_detector_observation:
result = last_car_detector_observation["result"]
actual_str += f" (๐ {' ' if result < 100 else ''}{' ' if result < 10 else ''}{result}%)"
else:
actual_str += f" (๐ 0%)"
if "detector_bike" in datastreams.values():
if last_bike_detector_observation:
result = last_bike_detector_observation["result"]
actual_str += f" (๐ฒ {' ' if result < 100 else ''}{' ' if result < 10 else ''}{result}%)"
else:
actual_str += f" (๐ฒ 0%)"
# Format the prediction
prediction_str = "Prediction: None"
if last_prediction is not None:
now = "".join([int_to_color(c) for c in base64.b64decode(last_prediction["now"])])
then = "".join([int_to_color(c) for c in base64.b64decode(last_prediction["then"])])
reference_time = datetime.datetime.strptime(last_prediction["referenceTime"], "%Y-%m-%dT%H:%M:%SZ")
seconds_passed = (datetime.datetime.now() - reference_time).total_seconds()
prediction = []
index = int(seconds_passed)
if index < len(now):
i = index % len(now)
prediction = now[i:] + "๐" + then
else:
i = (index - len(now)) % len(then)
prediction = then[i:] + "๐" + then
prediction = prediction[:60] # Only show the first 60 seconds
prediction_str = f"{prediction} - P{last_prediction['programId']}"
sys.stdout.write(f"\n{actual_str} โฌ
๏ธ {prediction_str}")
time.sleep(1)
# Start the threads
threads = [
threading.Thread(target=udp_thread),
threading.Thread(target=predictions_thread),
threading.Thread(target=print_thread),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()