-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
132 lines (105 loc) · 5.31 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import multiprocessing
import sys
from multiprocessing.managers import DictProxy
import broadcast_listener as b_listen
import bully_algorithm
import device_info
import discovery
import file_tcp_init_listener
import file_tcp_listener as f_listen
import file_tcp_o_multicast_listener as ordered_listen
import file_tcp_r_multicast_listener as reliable_listen
import heartbeat as hb
import monitor_local_folder
import shared_dict_helper
def establish_listeners(_device_info_static: device_info.DeviceInfoStatic,
_device_info_dynamic: device_info.DeviceInfoDynamic,
_election_queue: multiprocessing.Queue,
_require_queue: multiprocessing.Queue,
_delivered_queue: multiprocessing.Queue,
_shared_dict: DictProxy,
_lock):
_listeners = []
p_init_folder_listener = file_tcp_init_listener.FileInitListener(_device_info_static, _shared_dict, _lock)
_listeners.append(p_init_folder_listener)
p_init_folder_listener.start()
p_broadcast_listen = b_listen.BroadcastListener(
_device_info_static, _device_info_dynamic, _election_queue, _require_queue, _shared_dict, _lock)
_listeners.append(p_broadcast_listen)
p_broadcast_listen.start()
r_deliver_queue = multiprocessing.Queue()
o_deliver_queue = multiprocessing.Queue()
p_reliable_multicast_listen = reliable_listen.ReliableMulticastListener(
_device_info_static, _device_info_dynamic, r_deliver_queue, _shared_dict, _lock)
_listeners.append(p_reliable_multicast_listen)
p_reliable_multicast_listen.start()
p_ordered_multicast_listen = ordered_listen.OrderedMulticastListener(
_device_info_static, _device_info_dynamic, r_deliver_queue, o_deliver_queue, _delivered_queue, _shared_dict, _lock)
_listeners.append(p_ordered_multicast_listen)
p_ordered_multicast_listen.start()
p_file_listen = f_listen.FileListener(
_device_info_static, _device_info_dynamic, o_deliver_queue, _shared_dict, _lock)
_listeners.append(p_file_listen)
p_file_listen.start()
return _listeners
def start_bully(_device_info_static: device_info.DeviceInfoStatic,
_device_info_dynamic: device_info.DeviceInfoDynamic,
_election_queue: multiprocessing.Queue,
_shared_dict: DictProxy,
_lock):
_p_bully = multiprocessing.Process(target=bully_algorithm.BullyAlgorithm, args=(
_device_info_static, _device_info_dynamic, _election_queue, _shared_dict, _lock))
_p_bully.daemon = True
_p_bully.start()
return _p_bully
def start_folder_monitor(_device_info_static: device_info.DeviceInfoStatic,
_device_info_dynamic: device_info.DeviceInfoDynamic,
_require_queue: multiprocessing.Queue,
_delivered_queue: multiprocessing.Queue,
_shared_dict: DictProxy,
_lock):
_p_monitor = multiprocessing.Process(target=monitor_local_folder.FolderMonitor, args=(
_device_info_static, _device_info_dynamic, _require_queue, _delivered_queue, _shared_dict, _lock))
_p_monitor.daemon = True
_p_monitor.start()
return _p_monitor
def start_heartbeat(_device_info_static,
_shared_dict: DictProxy,
_lock,
_interval: int):
_heartbeat = multiprocessing.Process(target=hb.Heartbeat, args=(
_device_info_static, _shared_dict, _lock, _interval))
_heartbeat.daemon = True
_heartbeat.start()
return _heartbeat
if __name__ == '__main__':
if len(sys.argv) > 1:
input_id = int(sys.argv[1])
input_path = str(sys.argv[2])
device_info_static, device_info_dynamic = device_info.initialise_myself(input_id, input_path)
else:
device_info_static, device_info_dynamic = device_info.initialise_myself()
dynamic_manager = multiprocessing.Manager()
lock = dynamic_manager.Lock()
shared_device_info_dynamic = dynamic_manager.dict()
shared_dict_helper.initialise_shared_dict(shared_device_info_dynamic, lock, device_info_dynamic)
election_queue = multiprocessing.Queue()
require_queue = multiprocessing.Queue()
delivered_queue = multiprocessing.Queue()
listeners = establish_listeners(
device_info_static, device_info_dynamic, election_queue, require_queue,
delivered_queue, shared_device_info_dynamic, lock)
p_discovery = multiprocessing.Process(target=discovery.discover_peers, args=(
device_info_static, device_info_dynamic, shared_device_info_dynamic, lock))
p_discovery.start()
heartbeat = start_heartbeat(device_info_static, shared_device_info_dynamic, lock, _interval=5)
p_discovery.join()
# start monitoring file changes and elections after discovery
p_monitor = start_folder_monitor(
device_info_static, device_info_dynamic, require_queue, delivered_queue, shared_device_info_dynamic, lock)
p_bully = start_bully(device_info_static, device_info_dynamic, election_queue, shared_device_info_dynamic, lock)
# device_info_dynamic = shared_dict.get("device_info_dynamic")
device_info_dynamic.get_update_from_shared_dict(shared_device_info_dynamic)
device_info_dynamic.print_info()
for listener in listeners:
listener.join()