-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatch.py
73 lines (54 loc) · 1.99 KB
/
dispatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import queue
import enum
import redis
import threading
from monitor import QueryMonitor
# TODO: Find better solution for getting types than eval
REDIS_DB_NUMBER = 10
ALL_QUERIES_KEY = 'craigslist query urls'
class Command(enum.Enum):
ADD = enum.auto()
REMOVE = enum.auto()
class Dispatch(threading.Thread):
def __init__(self, notify_queue: queue.Queue):
threading.Thread.__init__(self)
self.redis_cli = redis.StrictRedis(db=10)
self.command_queue = queue.Queue()
self.notify_queue = notify_queue
query_urls = self.redis_cli.get(ALL_QUERIES_KEY)
if query_urls is None:
self.query_urls = set()
self.redis_cli.set(ALL_QUERIES_KEY, self.query_urls)
else:
self.query_urls = set(eval(query_urls))
self.active_monitors = []
self.make_monitors()
def make_monitors(self):
for url in self.query_urls:
qm = QueryMonitor(url, self.notify_queue)
qm.start()
self.active_monitors.append(qm)
def run(self):
while 1:
command, url = self.command_queue.get()
if command == Command.ADD:
self.add_monitor(url)
elif command == Command.REMOVE:
print('Removing', url)
self.remove_monitor(url)
def add_monitor(self, url):
monitor = QueryMonitor(url, self.notify_queue)
monitor.start()
self.query_urls.add(url)
self.redis_cli.set(ALL_QUERIES_KEY, self.query_urls)
self.active_monitors.append(monitor)
def remove_monitor(self, url):
for i in range(len(self.active_monitors)):
if self.active_monitors[i].feed_url == url:
self.active_monitors[i].stop()
del self.active_monitors[i]
break
else:
print('No monitor found for', url)
self.query_urls.remove(url)
self.redis_cli.set(ALL_QUERIES_KEY, self.query_urls)