forked from Tribler/dispersy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
community.py
3703 lines (3004 loc) · 177 KB
/
community.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
the community module provides the Community base class that should be used when a new Community is
implemented. It provides a simplified interface between the Dispersy instance and a running
Community instance.
@author: Boudewijn Schoon
@organization: Technical University Delft
@contact: [email protected]
"""
from abc import ABCMeta, abstractmethod
from collections import defaultdict, OrderedDict
from itertools import islice, groupby
import logging
from math import ceil
from random import random, Random, randint, shuffle, uniform
from time import time
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import LoopingCall, deferLater
from twisted.python.threadable import isInIOThread
from .authentication import NoAuthentication, MemberAuthentication, DoubleMemberAuthentication
from .bloomfilter import BloomFilter
from .candidate import Candidate, WalkCandidate
from .conversion import BinaryConversion, DefaultConversion, Conversion
from .destination import CommunityDestination, CandidateDestination
from .distribution import (SyncDistribution, GlobalTimePruning, LastSyncDistribution, DirectDistribution,
FullSyncDistribution)
from .exception import ConversionNotFoundException, MetaNotFoundException
from .member import DummyMember, Member
from .message import (BatchConfiguration, Message, Packet, DropMessage, DelayMessageByProof,
DelayMessageByMissingMessage, DropPacket, DelayPacket, DelayMessage)
from .payload import (AuthorizePayload, RevokePayload, UndoPayload, DestroyCommunityPayload, DynamicSettingsPayload,
IdentityPayload, MissingIdentityPayload, IntroductionRequestPayload, IntroductionResponsePayload,
PunctureRequestPayload, PuncturePayload, MissingMessagePayload, MissingSequencePayload,
MissingProofPayload, SignatureRequestPayload, SignatureResponsePayload)
from .requestcache import RequestCache, SignatureRequestCache, IntroductionRequestCache
from .resolution import PublicResolution, LinearResolution, DynamicResolution
from .statistics import CommunityStatistics
from .taskmanager import TaskManager
from .timeline import Timeline
from .util import runtime_duration_warning, attach_runtime_statistics, deprecated, is_valid_address
DOWNLOAD_MM_PK_INTERVAL = 15.0
FAST_WALKER_CANDIDATE_TARGET = 15
FAST_WALKER_MAX_NEW_ELIGIBLE_CANDIDATES = 10
FAST_WALKER_STEPS = 15
FAST_WALKER_STEP_INTERVAL = 2.0
PERIODIC_CLEANUP_INTERVAL = 5.0
TAKE_STEP_INTERVAL = 5
logger = logging.getLogger(__name__)
class SyncCache(object):
def __init__(self, time_low, time_high, modulo, offset, bloom_filter):
self.time_low = time_low
self.time_high = time_high
self.modulo = modulo
self.offset = offset
self.bloom_filter = bloom_filter
self.times_used = 0
self.responses_received = 0
self.candidate = None
class DispersyInternalMessage(object):
pass
class DispersyDuplicatedUndo(DispersyInternalMessage):
name = candidate = u"_DUPLICATED_UNDO_"
def __init__(self, low_message, high_message):
self.low_message = low_message
self.high_message = high_message
class Community(TaskManager):
__metaclass__ = ABCMeta
# Probability steps to get a sync skipped if the previous one was empty
_SKIP_CURVE_STEPS = [0, 0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
_SKIP_STEPS = len(_SKIP_CURVE_STEPS)
@classmethod
def get_classification(cls):
"""
Describes the community type. Should be the same across compatible versions.
@rtype: unicode
"""
return cls.__name__.decode("UTF-8")
@classmethod
def create_community(cls, dispersy, my_member, *args, **kargs):
"""
Create a new community owned by my_member.
Each unique community, that exists out in the world, is identified by a public/private key
pair. When the create_community method is called such a key pair is generated.
Furthermore, my_member will be granted permission to use all the messages that the community
provides.
@param dispersy: The Dispersy instance where this community will attach itself to.
@type dispersy: Dispersy
@param my_member: The Member that will be granted Permit, Authorize, and Revoke for all
messages.
@type my_member: Member
@param args: optional arguments that are passed to the community constructor.
@type args: tuple
@param kargs: optional keyword arguments that are passed to the community constructor.
@type args: dictionary
@return: The created community instance.
@rtype: Community
"""
from .dispersy import Dispersy
assert isinstance(dispersy, Dispersy), type(dispersy)
assert isinstance(my_member, Member), type(my_member)
assert my_member.public_key, my_member.database_id
assert my_member.private_key, my_member.database_id
assert isInIOThread()
master = dispersy.get_new_member(u"high")
# new community instance
community = cls.init_community(dispersy, master, my_member, *args, **kargs)
# create the dispersy-identity for the master member
message = community.create_identity(sign_with_master=True)
# authorize MY_MEMBER
permission_triplets = []
message_names = (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other")
for message in community.get_meta_messages():
# grant all permissions for messages that use LinearResolution or DynamicResolution
if isinstance(message.resolution, (LinearResolution, DynamicResolution)):
for allowed in (u"authorize", u"revoke", u"permit"):
permission_triplets.append((my_member, message, allowed))
# ensure that undo_callback is available
if message.undo_callback:
# we do not support undo permissions for authorize, revoke, undo-own, and
# undo-other (yet)
if not message.name in message_names:
permission_triplets.append((my_member, message, u"undo"))
# grant authorize, revoke, and undo permission for messages that use PublicResolution
# and SyncDistribution. Why? The undo permission allows nodes to revoke a specific
# message that was gossiped around. The authorize permission is required to grant other
# nodes the undo permission. The revoke permission is required to remove the undo
# permission. The permit permission is not required as the message uses
# PublicResolution and is hence permitted regardless.
elif isinstance(message.distribution, SyncDistribution) and isinstance(message.resolution, PublicResolution):
# ensure that undo_callback is available
if message.undo_callback:
# we do not support undo permissions for authorize, revoke, undo-own, and
# undo-other (yet)
if not message.name in message_names:
for allowed in (u"authorize", u"revoke", u"undo"):
permission_triplets.append((my_member, message, allowed))
if permission_triplets:
community.create_authorize(permission_triplets, sign_with_master=True, forward=False)
return community
@classmethod
def get_master_members(cls, dispersy):
from .dispersy import Dispersy
assert isinstance(dispersy, Dispersy), type(dispersy)
assert isInIOThread()
logger.debug("retrieving all master members owning %s communities", cls.get_classification())
execute = dispersy.database.execute
return [dispersy.get_member(public_key=str(public_key)) if public_key else dispersy.get_member(mid=str(mid))
for mid, public_key,
in list(execute(u"SELECT m.mid, m.public_key FROM community AS c JOIN member AS m ON m.id = c.master"
u" WHERE c.classification = ?",
(cls.get_classification(),)))]
@classmethod
def init_community(cls, dispersy, master, my_member, *args, **kargs):
"""
Initializes a new community, using master as the identifier and my_member as the
public/private keypair to be used when sending messages.
Each community is identified by the hash of the public key of the master member.
This member is created in the create_community method.
@param dispersy: The Dispersy instance where this community will attach itself to.
@type dispersy: Dispersy
@param master: The master member that identifies the community.
@type master: DummyMember or Member
@param my_member: The my member that identifies you in this community.
@type my_member: Member
@param args: optional arguments that are passed to the community constructor.
@type args: tuple
@param kargs: optional keyword arguments that are passed to the community constructor.
@type kargs: dictionary
@return: The initialized community instance.
@rtype: Community
"""
from .dispersy import Dispersy
assert isinstance(dispersy, Dispersy), type(dispersy)
assert isinstance(my_member, Member), type(my_member)
assert my_member.public_key, my_member.database_id
assert my_member.private_key, my_member.database_id
assert isInIOThread()
# new community instance
community = cls(dispersy, master, my_member)
# add to dispersy
dispersy.attach_community(community)
community.initialize(*args, **kargs)
return community
def __init__(self, dispersy, master, my_member):
"""
Please never call the constructor of a community directly, always use
create_community or init_community.
@param dispersy: The Dispersy object.
@type dispersy: Dispersy
@param master: The master member that identifies the community.
@type master: DummyMember or Member
@param my_member: The my member that identifies you in this community.
@type my_member: Member
"""
assert isInIOThread()
from .dispersy import Dispersy
assert isinstance(dispersy, Dispersy), type(dispersy)
assert isinstance(master, DummyMember), type(master)
assert master.mid not in dispersy._communities
assert isinstance(my_member, Member), type(my_member)
assert my_member.public_key, my_member.database_id
assert my_member.private_key, my_member.database_id
assert isInIOThread()
super(Community, self).__init__()
self._logger = logging.getLogger(self.__class__.__name__)
# Dispersy
self._dispersy = dispersy
# community data
self._database_id = None
self._database_version = None
self._cid = master.mid
self._master_member = master
self._my_member = my_member
self._global_time = 0
self._candidates = OrderedDict()
self._statistics = CommunityStatistics(self)
self._last_sync_time = 0
# batch caching incoming packets
self._batch_cache = {}
# delayed list for incoming packet/messages which are delayed
self._delayed_key = defaultdict(list)
self._delayed_value = defaultdict(list)
self.meta_message_cache = {}
self._meta_messages = {}
self._conversions = []
self._nrsyncpackets = 0
self._do_pruning = False
self._sync_cache_skip_count = 0
self._acceptable_global_time_deadline = 0.0
self._request_cache = None
self._timeline = None
self._random = None
self._walked_candidates = None
self._stumbled_candidates = None
self._introduced_candidates = None
self._walk_candidates = None
self._fast_steps_taken = 0
self._sync_cache = None
def initialize(self):
assert isInIOThread()
self._logger.info("initializing: %s", self.get_classification())
self._logger.debug("master member: %s %s", self._master_member.mid.encode("HEX"),
"" if self._master_member.public_key else " (no public key available)")
# Do not immediately call the periodic cleanup LC to avoid an infinite recursion problem: init_community ->
# initialize -> invoke_func -> _get_latest_channel_message -> convert_packet_to_message -> get_community ->
# init_community
self.register_task("periodic cleanup", LoopingCall(self._periodically_clean_delayed)).start(PERIODIC_CLEANUP_INTERVAL, now=False)
try:
self._database_id, my_member_did, self._database_version = self._dispersy.database.execute(
u"SELECT id, member, database_version FROM community WHERE master = ?",
(self._master_member.database_id,)).next()
# if we're called with a different my_member, update the table to reflect this
if my_member_did != self._my_member.database_id:
self._dispersy.database.execute(u"UPDATE community SET member = ? WHERE master = ?",
(self._my_member.database_id, self._master_member.database_id))
except StopIteration:
self._dispersy.database.execute(
u"INSERT INTO community(master, member, classification) VALUES(?, ?, ?)",
(self._master_member.database_id, self._my_member.database_id, self.get_classification()))
self._database_id, self._database_version = self._dispersy.database.execute(
u"SELECT id, database_version FROM community WHERE master = ?",
(self._master_member.database_id,)).next()
self._logger.debug("database id: %d", self._database_id)
self._logger.debug("my member: %s", self._my_member.mid.encode("HEX"))
assert self._my_member.public_key, [self._database_id, self._my_member.database_id, self._my_member.public_key]
assert self._my_member.private_key, [self._database_id, self._my_member.database_id, self._my_member.private_key]
if not self._master_member.public_key and self.dispersy_enable_candidate_walker and self.dispersy_auto_download_master_member:
lc = LoopingCall(self._download_master_member_identity)
reactor.callLater(0, lc.start, DOWNLOAD_MM_PK_INTERVAL, now=True)
self.register_task("download master member identity", lc)
# define all available messages
self._initialize_meta_messages()
# we're only interrested in the meta_message, filter the meta_message_cache
for name in self.meta_message_cache.keys():
if name not in self._meta_messages:
del self.meta_message_cache[name]
# batched insert
update_list = []
for database_id, name, priority, direction in self._dispersy.database.execute(u"SELECT id, name, priority, direction FROM meta_message WHERE community = ?", (self._database_id,)):
meta_message_info = self.meta_message_cache.get(name)
if meta_message_info:
if priority != meta_message_info["priority"] or direction != meta_message_info["direction"]:
update_list.append((priority, direction, database_id))
self._meta_messages[name]._database_id = database_id
del self.meta_message_cache[name]
if update_list:
self._dispersy.database.executemany(u"UPDATE meta_message SET priority = ?, direction = ? WHERE id = ?",
update_list)
if self.meta_message_cache:
insert_list = []
for name, data in self.meta_message_cache.iteritems():
insert_list.append((self.database_id, name, data["priority"], data["direction"]))
self._dispersy.database.executemany(u"INSERT INTO meta_message (community, name, priority, direction) VALUES (?, ?, ?, ?)",
insert_list)
for database_id, name in self._dispersy.database.execute(u"SELECT id, name FROM meta_message WHERE community = ?", (self._database_id,)):
self._meta_messages[name]._database_id = database_id # cleanup pre-fetched values
self.meta_message_cache = None
# define all available conversions
self._conversions = self.initiate_conversions()
if __debug__:
assert len(self._conversions) > 0, len(self._conversions)
assert all(isinstance(conversion, Conversion) for conversion in self._conversions), [type(conversion) for conversion in self._conversions]
# the global time. zero indicates no messages are available, messages must have global
# times that are higher than zero.
self._global_time, = self._dispersy.database.execute(u"SELECT MAX(global_time) FROM sync WHERE community = ?", (self._database_id,)).next()
if self._global_time is None:
self._global_time = 0
assert isinstance(self._global_time, (int, long))
self._acceptable_global_time_cache = self._global_time
self._logger.debug("global time: %d", self._global_time)
# the sequence numbers
for current_sequence_number, name in self._dispersy.database.execute(u"SELECT MAX(sync.sequence), meta_message.name FROM sync, meta_message WHERE sync.meta_message = meta_message.id AND sync.member = ? AND meta_message.community = ? GROUP BY meta_message.name", (self._my_member.database_id, self.database_id)):
if current_sequence_number:
self._meta_messages[name].distribution._current_sequence_number = current_sequence_number
# sync range bloom filters
self._sync_cache = None
self._sync_cache_skip_count = 0
if __debug__:
b = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate)
self._logger.debug("sync bloom: size: %d; capacity: %d; error-rate: %f",
int(ceil(b.size // 8)),
b.get_capacity(self.dispersy_sync_bloom_filter_error_rate),
self.dispersy_sync_bloom_filter_error_rate)
# assigns temporary cache objects to unique identifiers
self._request_cache = RequestCache()
# initial timeline. the timeline will keep track of member permissions
self._timeline = Timeline(self)
self._initialize_timeline()
# random seed, used for sync range
self._random = Random()
# Initialize all the candidate iterators
self._walked_candidates = self._iter_category(u'walk')
self._stumbled_candidates = self._iter_category(u'stumble')
self._introduced_candidates = self._iter_category(u'intro')
self._walk_candidates = self._iter_categories([u'walk', u'stumble', u'intro'])
# statistics...
self._statistics.update()
# turn on/off pruning
self._do_pruning = any(isinstance(meta.distribution, SyncDistribution) and
isinstance(meta.distribution.pruning, GlobalTimePruning)
for meta in self._meta_messages.itervalues())
try:
# check if we have already created the identity message
self.dispersy._database.execute(u"SELECT 1 FROM sync WHERE member = ? AND meta_message = ? LIMIT 1",
(self._my_member.database_id, self.get_meta_message
(u"dispersy-identity").database_id)).next()
self._my_member.add_identity(self)
except StopIteration:
# we haven't do it now
self.create_identity()
# check/sanity check the database
self.dispersy_check_database()
from sys import argv
if "--sanity-check" in argv:
try:
self.dispersy.sanity_check(self)
except ValueError:
self._logger.exception("sanity check fail for %s", self)
# start walker, if needed
if self.dispersy_enable_candidate_walker:
self.register_task("start_walking",
reactor.callLater(self.database_id % 3, self.start_walking))
@property
def candidates(self):
"""
Dictionary containing sock_addr:Candidate pairs.
"""
return self._candidates
@property
def request_cache(self):
"""
The request cache instance responsible for maintaining identifiers and timeouts for outstanding requests.
@rtype: RequestCache
"""
return self._request_cache
@property
def statistics(self):
"""
The Statistics instance.
"""
return self._statistics
def _download_master_member_identity(self):
assert not self._master_member.public_key
self._logger.debug("using dummy master member")
try:
public_key, = self._dispersy.database.execute(u"SELECT public_key FROM member WHERE id = ?", (self._master_member.database_id,)).next()
except StopIteration:
pass
else:
if public_key:
self._logger.debug("%s found master member", self._cid.encode("HEX"))
self._master_member = self._dispersy.get_member(public_key=str(public_key))
assert self._master_member.public_key
self.cancel_pending_task("download master member identity")
else:
for candidate in islice(self.dispersy_yield_verified_candidates(), 1):
if candidate:
self._logger.debug("%s asking for master member from %s", self._cid.encode("HEX"), candidate)
self.create_missing_identity(candidate, self._master_member)
def _initialize_meta_messages(self):
assert isinstance(self._meta_messages, dict)
assert len(self._meta_messages) == 0
# obtain meta messages
for meta_message in self.initiate_meta_messages():
assert meta_message.name not in self._meta_messages
self._meta_messages[meta_message.name] = meta_message
if __debug__:
sync_interval = 5.0
for meta_message in self._meta_messages.itervalues():
if isinstance(meta_message.distribution, SyncDistribution) and meta_message.batch.max_window >= sync_interval:
self._logger.warning(
"when sync is enabled the interval should be greater than the walking frequency. "
" otherwise you are likely to receive duplicate packets [%s]", meta_message.name)
def _initialize_timeline(self):
mapping = {}
for name in [u"dispersy-authorize", u"dispersy-revoke", u"dispersy-dynamic-settings"]:
try:
meta = self.get_meta_message(name)
mapping[meta.database_id] = meta.handle_callback
except MetaNotFoundException:
self._logger.warning("unable to load permissions from database [could not obtain %s]", name)
if mapping:
for packet, in list(self._dispersy.database.execute(u"SELECT packet FROM sync WHERE meta_message IN (" + ", ".join("?" for _ in mapping) + ") ORDER BY global_time, packet",
mapping.keys())):
message = self._dispersy.convert_packet_to_message(str(packet), self, verify=False)
if message:
self._logger.debug("processing %s", message.name)
mapping[message.database_id]([message], initializing=True)
else:
# TODO: when a packet conversion fails we must drop something, and preferably check
# all messages in the database again...
self._logger.error("invalid message in database [%s; %s]\n%s",
self.get_classification(), self.cid.encode("HEX"), str(packet).encode("HEX"))
@property
def dispersy_auto_load(self):
"""
When True, this community will automatically be loaded when a packet is received.
"""
# currently we grab it directly from the database, should become a property for efficiency
return bool(self._dispersy.database.execute(u"SELECT auto_load FROM community WHERE master = ?",
(self._master_member.database_id,)).next()[0])
@dispersy_auto_load.setter
def dispersy_auto_load(self, auto_load):
"""
Sets the auto_load flag for this community.
"""
assert isinstance(auto_load, bool)
self._dispersy.database.execute(u"UPDATE community SET auto_load = ? WHERE master = ?",
(1 if auto_load else 0, self._master_member.database_id))
@property
def dispersy_auto_download_master_member(self):
"""
Enable or disable automatic downloading of the dispersy-identity for the master member.
"""
return True
@property
def dispersy_enable_candidate_walker(self):
"""
Enable the candidate walker.
When True is returned, the take_step method will be called periodically. Otherwise
it will be ignored. The candidate walker is enabled by default.
"""
return True
@property
def dispersy_enable_fast_candidate_walker(self):
"""
Enable the fast candidate walker.
When True is returned, the take_step method will initially take step more often to boost
the number of candidates available at startup.
The candidate fast walker is disabled by default.
"""
return False
@property
def dispersy_enable_candidate_walker_responses(self):
"""
Enable the candidate walker responses.
When True is returned, the community will be able to respond to incoming
dispersy-introduction-request and dispersy-puncture-request messages. Otherwise these
messages are left undefined and will be ignored.
When dispersy_enable_candidate_walker returns True, this property must also return True.
The default value is to mirror self.dispersy_enable_candidate_walker.
"""
return self.dispersy_enable_candidate_walker
@property
def dispersy_enable_bloom_filter_sync(self):
"""
Enable the bloom filter synchronisation during the neighbourhood walking.
When True is returned, outgoing dispersy-introduction-request messages will get the chance to include a sync
bloom filter by calling Community.dispersy_claim_sync_bloom_filter(...).
When False is returned, outgoing dispersy-introduction-request messages will never include sync bloom filters
and Community.acceptable_global_time will return 2 ** 63 - 1, ensuring that all messages that are delivered
on-demand or incidentally, will be accepted.
"""
return True
@property
def dispersy_sync_bloom_filter_error_rate(self):
"""
The error rate that is allowed within the sync bloom filter.
Having a higher error rate will allow for more items to be stored in the bloom filter,
allowing more items to be syced with each sync interval. Although this has the disadvantage
that more false positives will occur.
A false positive will mean that if A sends a dispersy-sync message to B, B will incorrectly
believe that A already has certain messages. Each message has -error rate- chance of being
a false positive, and hence B will not be able to receive -error rate- percent of the
messages in the system.
This problem can be aleviated by having multiple bloom filters for each sync range with
different prefixes. Because bloom filters with different prefixes are extremely likely (the
hash functions md5, sha1, shaxxx ensure this) to have false positives for different packets.
Hence, having two of three different bloom filters will ensure you will get all messages,
though it will take more rounds.
@rtype: float
"""
return 0.01
@property
def dispersy_sync_bloom_filter_bits(self):
"""
The size in bits of this bloom filter.
Note that the amount must be a multiple of eight.
The sync bloom filter is part of the dispersy-introduction-request message and hence must
fit within a single MTU. There are several numbers that need to be taken into account.
- A typical MTU is 1500 bytes
- A typical IP header is 20 bytes. However, the maximum IP header is 60 bytes (this
includes information for VPN, tunnels, etc.)
- The UDP header is 8 bytes
- The dispersy header is 2 + 20 + 1 + 20 + 8 = 51 bytes (version, cid, type, member,
global-time)
- The signature is usually 60 bytes. This depends on what public/private key was chosen.
The current value is: self._my_member.signature_length
- The other payload is 6 + 6 + 6 + 1 + 2 = 21 (destination-address, source-lan-address,
source-wan-address, advice+connection-type+sync flags, identifier)
- The sync payload uses 8 + 8 + 4 + 4 + 1 + 4 + 1 = 30 (time low, time high, modulo, offset,
function, bits, prefix)
"""
return (1500 - 60 - 8 - 51 - self._my_member.signature_length - 21 - 30) * 8
@property
def dispersy_sync_bloom_filter_strategy(self):
return self._dispersy_claim_sync_bloom_filter_largest
@property
def dispersy_sync_skip_enable(self):
return True # _sync_skip_
@property
def dispersy_sync_cache_enable(self):
return True # _cache_enable_
def dispersy_store(self, messages):
"""
Called after new MESSAGES have been stored in the database.
"""
if __debug__:
cached = 0
if self._sync_cache:
cache = self._sync_cache
for message in messages:
if (message.distribution.priority > 32 and
cache.time_low <= message.distribution.global_time <= cache.time_high and
(message.distribution.global_time + cache.offset) % cache.modulo == 0):
if __debug__:
cached += 1
# update cached bloomfilter to avoid duplicates
cache.bloom_filter.add(message.packet)
# if this message was received from the candidate we send the bloomfilter too, increment responses
if (cache.candidate and message.candidate and cache.candidate.sock_addr == message.candidate.sock_addr):
cache.responses_received += 1
if __debug__:
if cached:
self._logger.debug("%s] %d out of %d were part of the cached bloomfilter",
self._cid.encode("HEX"), cached, len(messages))
def dispersy_claim_sync_bloom_filter(self, request_cache):
"""
Returns a (time_low, time_high, modulo, offset, bloom_filter) or None.
"""
if self._sync_cache:
if self._sync_cache.responses_received > 0:
if self.dispersy_sync_skip_enable:
# We have received data, reset skip counter
self._sync_cache_skip_count = 0
if self.dispersy_sync_cache_enable and self._sync_cache.times_used < 100:
self._statistics.sync_bloom_reuse += 1
self._statistics.sync_bloom_send += 1
cache = self._sync_cache
cache.times_used += 1
cache.responses_received = 0
cache.candidate = request_cache.helper_candidate
self._logger.debug("%s reuse #%d (packets received: %d; %s)",
self._cid.encode("HEX"), cache.times_used, cache.responses_received,
hex(cache.bloom_filter._filter))
return cache.time_low, cache.time_high, cache.modulo, cache.offset, cache.bloom_filter
elif self._sync_cache.times_used == 0:
# Still no updates, gradually increment the skipping probability one notch
self._logger.debug("skip:%d -> %d received:%d", self._sync_cache_skip_count,
min(self._sync_cache_skip_count + 1, self._SKIP_STEPS),
self._sync_cache.responses_received)
self._sync_cache_skip_count = min(self._sync_cache_skip_count + 1, self._SKIP_STEPS)
if (self.dispersy_sync_skip_enable and
self._sync_cache_skip_count and
random() < self._SKIP_CURVE_STEPS[self._sync_cache_skip_count - 1]):
# Lets skip this one
self._logger.debug("skip: random() was <%f", self._SKIP_CURVE_STEPS[self._sync_cache_skip_count - 1])
self._statistics.sync_bloom_skip += 1
self._sync_cache = None
return None
sync = self.dispersy_sync_bloom_filter_strategy(request_cache)
if sync:
self._sync_cache = SyncCache(*sync)
self._sync_cache.candidate = request_cache.helper_candidate
self._statistics.sync_bloom_new += 1
self._statistics.sync_bloom_send += 1
self._logger.debug("%s new sync bloom (%d/%d~%.2f)", self._cid.encode("HEX"),
self._statistics.sync_bloom_reuse, self._statistics.sync_bloom_new,
round(1.0 * self._statistics.sync_bloom_reuse / self._statistics.sync_bloom_new, 2))
return sync
# instead of pivot + capacity, compare pivot - capacity and pivot + capacity to see which globaltime range is largest
@runtime_duration_warning(0.5)
@attach_runtime_statistics(u"{0.__class__.__name__}.{function_name}")
def _dispersy_claim_sync_bloom_filter_largest(self, request_cache):
if __debug__:
t1 = time()
syncable_messages = u", ".join(unicode(meta.database_id) for meta in self._meta_messages.itervalues() if isinstance(meta.distribution, SyncDistribution) and meta.distribution.priority > 32)
if syncable_messages:
if __debug__:
t2 = time()
acceptable_global_time = self.acceptable_global_time
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
desired_mean = self.global_time / 2.0
lambd = 1.0 / desired_mean
from_gbtime = self.global_time - int(self._random.expovariate(lambd))
if from_gbtime < 1:
from_gbtime = int(self._random.random() * self.global_time)
if from_gbtime > 1 and self._nrsyncpackets >= capacity:
# use from_gbtime -1/+1 to include from_gbtime
right, rightdata = self._select_bloomfilter_range(request_cache, syncable_messages, from_gbtime - 1, capacity, True)
# if right did not get to capacity, then we have less than capacity items in the database
# skip left
if right[2] == capacity:
left, leftdata = self._select_bloomfilter_range(request_cache, syncable_messages, from_gbtime + 1, capacity, False)
left_range = (left[1] or self.global_time) - left[0]
right_range = (right[1] or self.global_time) - right[0]
if left_range > right_range:
bloomfilter_range = left
data = leftdata
else:
bloomfilter_range = right
data = rightdata
else:
bloomfilter_range = right
data = rightdata
if __debug__:
t3 = time()
else:
if __debug__:
t3 = time()
bloomfilter_range = [1, acceptable_global_time]
data, fixed = self._select_and_fix(request_cache, syncable_messages, 0, capacity, True)
if len(data) > 0 and fixed:
bloomfilter_range[1] = data[-1][0]
self._nrsyncpackets = capacity + 1
if __debug__:
t4 = time()
if len(data) > 0:
bloom.add_keys(str(packet) for _, packet in data)
if __debug__:
self._logger.debug("%s syncing %d-%d, nr_packets = %d, capacity = %d, packets %d-%d, pivot = %d",
self.cid.encode("HEX"), bloomfilter_range[0], bloomfilter_range[1],
len(data), capacity, data[0][0], data[-1][0], from_gbtime)
self._logger.debug("%s took %f (fakejoin %f, rangeselect %f, dataselect %f, bloomfill, %f",
self.cid.encode("HEX"), time() - t1, t2 - t1, t3 - t2, t4 - t3, time() - t4)
return (min(bloomfilter_range[0], acceptable_global_time), min(bloomfilter_range[1], acceptable_global_time), 1, 0, bloom)
if __debug__:
self._logger.debug("%s no messages to sync", self.cid.encode("HEX"))
elif __debug__:
self._logger.debug("%s NOT syncing no syncable messages", self.cid.encode("HEX"))
return (1, acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
def _select_bloomfilter_range(self, request_cache, syncable_messages, global_time, to_select, higher=True):
data, fixed = self._select_and_fix(request_cache, syncable_messages, global_time, to_select, higher)
lowerfixed = True
higherfixed = True
# if we selected less than to_select
if len(data) < to_select:
# calculate how many still remain
to_select = to_select - len(data)
if to_select > 25:
if higher:
lowerdata, lowerfixed = self._select_and_fix(request_cache, syncable_messages, global_time + 1, to_select, False)
data = lowerdata + data
else:
higherdata, higherfixed = self._select_and_fix(request_cache, syncable_messages, global_time - 1, to_select, True)
data = data + higherdata
bloomfilter_range = [data[0][0], data[-1][0], len(data)]
# we can use the global_time as a min or max value for lower and upper bound
if higher:
# we selected items higher than global_time, make sure bloomfilter_range[0] is at least as low a global_time + 1
# we select all items higher than global_time, thus all items global_time + 1 are included
bloomfilter_range[0] = min(bloomfilter_range[0], global_time + 1)
# if not fixed and higher, then we have selected up to all know packets
if not fixed:
bloomfilter_range[1] = self.acceptable_global_time
if not lowerfixed:
bloomfilter_range[0] = 1
else:
# we selected items lower than global_time, make sure bloomfilter_range[1] is at least as high as global_time -1
# we select all items lower than global_time, thus all items global_time - 1 are included
bloomfilter_range[1] = max(bloomfilter_range[1], global_time - 1)
if not fixed:
bloomfilter_range[0] = 1
if not higherfixed:
bloomfilter_range[1] = self.acceptable_global_time
return bloomfilter_range, data
def _select_and_fix(self, request_cache, syncable_messages, global_time, to_select, higher=True):
assert isinstance(syncable_messages, unicode)
if higher:
data = list(self._dispersy.database.execute(u"SELECT global_time, packet FROM sync WHERE meta_message IN (%s) AND undone = 0 AND global_time > ? ORDER BY global_time ASC LIMIT ?" % (syncable_messages),
(global_time, to_select + 1)))
else:
data = list(self._dispersy.database.execute(u"SELECT global_time, packet FROM sync WHERE meta_message IN (%s) AND undone = 0 AND global_time < ? ORDER BY global_time DESC LIMIT ?" % (syncable_messages),
(global_time, to_select + 1)))
fixed = False
if len(data) > to_select:
fixed = True
# if last 2 packets are equal, then we need to drop those
global_time = data[-1][0]
del data[-1]
while data and data[-1][0] == global_time:
del data[-1]
if not higher:
data.reverse()
return data, fixed
# instead of pivot + capacity, compare pivot - capacity and pivot + capacity to see which globaltime range is largest
@runtime_duration_warning(0.5)
@attach_runtime_statistics(u"{0.__class__.__name__}.{function_name}")
def _dispersy_claim_sync_bloom_filter_modulo(self, request_cache):
syncable_messages = u", ".join(unicode(meta.database_id) for meta in self._meta_messages.itervalues() if isinstance(meta.distribution, SyncDistribution) and meta.distribution.priority > 32)
if syncable_messages:
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
self._nrsyncpackets = list(self._dispersy.database.execute(u"SELECT count(*) FROM sync WHERE meta_message IN (%s) AND undone = 0 LIMIT 1" % (syncable_messages)))[0][0]
modulo = int(ceil(self._nrsyncpackets / float(capacity)))
if modulo > 1:
offset = randint(0, modulo - 1)
packets = list(str(packet) for packet, in self._dispersy.database.execute(u"SELECT sync.packet FROM sync WHERE meta_message IN (%s) AND sync.undone = 0 AND (sync.global_time + ?) %% ? = 0" % syncable_messages, (offset, modulo)))
else:
offset = 0
modulo = 1
packets = list(str(packet) for packet, in self._dispersy.database.execute(u"SELECT sync.packet FROM sync WHERE meta_message IN (%s) AND sync.undone = 0" % syncable_messages))
bloom.add_keys(packets)
self._logger.debug("%s syncing %d-%d, nr_packets = %d, capacity = %d, totalnr = %d",
self.cid.encode("HEX"), modulo, offset, self._nrsyncpackets, capacity, self._nrsyncpackets)
return (1, self.acceptable_global_time, modulo, offset, bloom)
else:
self._logger.debug("%s NOT syncing no syncable messages", self.cid.encode("HEX"))
return (1, self.acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
@property
def dispersy_sync_response_limit(self):
"""
The maximum number of bytes to send back per received dispersy-sync message.
@rtype: int
"""
return 5 * 1024
@property
def dispersy_missing_sequence_response_limit(self):
"""
The maximum number of bytes to send back per received dispersy-missing-sequence message.
@rtype: (int, int)
"""
return 10 * 1024
@property
def dispersy_acceptable_global_time_range(self):
return 10000
@property
def cid(self):
"""
The 20 byte sha1 digest of the public master key, in other words: the community identifier.
@rtype: string
"""
return self._cid
@property
def database_id(self):
"""
The number used to identify this community in the local Dispersy database.
@rtype: int or long
"""
return self._database_id
@property
def database_version(self):
return self._database_version
@property
def master_member(self):
"""
The community Member instance.
@rtype: Member
"""
return self._master_member
@property
def my_member(self):
"""
Our own Member instance that is used to sign the messages that we create.
@rtype: Member
"""
return self._my_member
@property
def dispersy(self):
"""
The Dispersy instance.
@rtype: Dispersy
"""
return self._dispersy
@property
def timeline(self):