-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject_server_cluster.test.py
72 lines (59 loc) · 3.65 KB
/
object_server_cluster.test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import unittest
from unittest.mock import MagicMock
from object_server_cluster import ObjectServerCluster
class TestObjectServerCluster(unittest.TestCase):
def setUp(self):
self.object_server_cluster = ObjectServerCluster()
def test_replicate_object(self):
object_key = "test_key"
object_data = b"test_data"
self.object_server_cluster.object_servers = [MagicMock(), MagicMock()]
self.object_server_cluster.replicate_object(object_key, object_data)
for object_server in self.object_server_cluster.object_servers:
if object_server != self.object_server_cluster:
object_server.storage_backend.write_object.assert_called_once_with(object_key, object_data)
def test_delete_object(self):
object_key = "test_key"
self.object_server_cluster.object_servers = [MagicMock(), MagicMock()]
self.object_server_cluster.delete_object(object_key)
for object_server in self.object_server_cluster.object_servers:
if object_server != self.object_server_cluster:
object_server.storage_backend.delete_object.assert_called_once_with(object_key)
def test_rebalance_objects(self):
self.object_server_cluster.rebalance_objects_using_load_balancing = MagicMock()
self.object_server_cluster.rebalance_interval = 1
self.object_server_cluster.rebalance_objects()
self.object_server_cluster.rebalance_objects_using_load_balancing.assert_called_once()
self.assertEqual(self.object_server_cluster.rebalance_objects_using_load_balancing.call_count, 1)
def test_exchange_heartbeats(self):
self.object_server_cluster.send_heartbeat = MagicMock()
self.object_server_cluster.heartbeat_interval = 1
self.object_server_cluster.exchange_heartbeats()
self.object_server_cluster.send_heartbeat.assert_called_once()
self.assertEqual(self.object_server_cluster.send_heartbeat.call_count,
len(self.object_server_cluster.object_servers) - 1)
def test_replicate_object_using_consensus(self):
object_key = "test_key"
object_data = b"test_data"
self.object_server_cluster.object_servers = [MagicMock(), MagicMock()]
self.object_server_cluster.redundancy_factor = 2
self.object_server_cluster.consensus_threshold = 0.5
self.object_server_cluster.replicate_object_using_consensus(object_key, object_data)
for object_server in self.object_server_cluster.object_servers:
if object_server != self.object_server_cluster:
object_server.storage_backend.write_object.assert_called_once_with(object_key, object_data)
def test_rebalance_objects_using_load_balancing(self):
self.object_server_cluster.object_servers = [MagicMock(), MagicMock()]
self.object_server_cluster.storage_backend.get_objects = MagicMock(return_value=[MagicMock(), MagicMock()])
self.object_server_cluster.rebalance_objects_using_load_balancing()
self.assertEqual(self.object_server_cluster.storage_backend.write_object.call_count, 1)
def test_use_redundancy(self):
self.object_server_cluster.object_servers = [MagicMock(), MagicMock()]
self.object_server_cluster.storage_backend.objects = [MagicMock(), MagicMock()]
self.object_server_cluster.redundancy_factor = 2
self.object_server_cluster.use_redundancy()
for object_server in self.object_server_cluster.object_servers:
for obj in object_server.storage_backend.objects:
object_server.storage_backend.write_object.assert_called_once_with(obj.key, obj.data)
if __name__ == '__main__':
unittest.main()