-
Notifications
You must be signed in to change notification settings - Fork 0
/
pbk
executable file
·92 lines (72 loc) · 2.53 KB
/
pbk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
"""
sudo apt install python3-plyer
"""
import asyncio
from datetime import datetime, time
import sys
import os
import json
from pprint import pprint
import requests
from plyer import notification
ebike_name = "E-Bike"
bike_name = "Bike"
async def get_bikes_by_id(id):
url = f"https://rest.publibike.ch/v1/public/stations/{id}"
response = requests.get(url, headers={"Accept-Language": "en"})
if response.status_code == 200:
data = response.json()
else:
print(f"Request failed with status code: {response.status_code}")
count_ebikes = 0
count_bikes = 0
vehicles = data["vehicles"]
for vehicle in vehicles:
if vehicle["type"]["name"] == bike_name:
count_bikes += 1
elif vehicle["type"]["name"] == ebike_name:
count_ebikes += 1
else:
print(vehicle["name"])
raise NotImplementedError
return count_bikes, count_ebikes
async def main():
tasks = {}
async with asyncio.TaskGroup() as tg:
for name, id in ids.items():
tasks[name] = tg.create_task(get_bikes_by_id(id))
for name in tasks:
tasks[name] = tasks[name].result()
return tasks
def send_notify(name, count_bikes, count_ebikes):
"""Notify conditionally about the given station of the given counts."""
if count_bikes + count_ebikes == 0:
return
start_time = time(16, 30)
end_time = time(20, 30)
if start_time <= datetime.now().time() <= end_time and count_ebikes <= 15:
notification.notify(
title=f"{count_ebikes} ebikes left at {name}.",
message=f"{count_bikes} bikes left.",
timeout=10,
)
def print_results(counts, notify=False):
for name, (count_bikes, count_ebikes) in counts.items():
print(f"For station: {name}.")
print(f"Bikes: {count_bikes}, E-Bikes: {count_ebikes}")
if notify and name in to_notify:
send_notify(name, count_bikes, count_ebikes)
if __name__ == "__main__":
try:
# Requires a variable exported in a .zshrc such as:
# export PUBLIBIKE_IDS='{"SOME_NAME": 123, "SOME_OTHER": 789}'
ids = json.loads(os.environ["PUBLIBIKE_IDS"])
# Requires a variable exported in a .zshrc such as:
# export PUBLIBIKE_NOTIFY='["SOME_NAME"]'
to_notify = json.loads(os.environ.get("PUBLIBIKE_NOTIFY", "[]"))
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
sys.exit(1)
counts = asyncio.run(main())
print_results(counts)