Skip to content

Commit

Permalink
add cancel message check
Browse files Browse the repository at this point in the history
  • Loading branch information
LipuFei committed Sep 9, 2014
1 parent d0b423d commit 4e5b7ef
Showing 1 changed file with 80 additions and 1 deletion.
81 changes: 80 additions & 1 deletion community.py
Original file line number Diff line number Diff line change
Expand Up @@ -3305,6 +3305,7 @@ def check_cancel(self, messages):

assert all(message.name in (u"dispersy-cancel-own", u"dispersy-cancel-other") for message in messages)

to_check_message_list = list()
for message in messages:
if message.payload.packet is None:
# obtain the packet that we are attempting to cancel
Expand Down Expand Up @@ -3334,7 +3335,85 @@ def check_cancel(self, messages):
yield delay
continue

yield message
to_check_message_list.append(message)

# check the messages and only use the latest ones
category_dict = dict()
for message in to_check_message_list:
key = (message.payload.member.database_id, message.payload.global_time)
if key not in category_dict:
category_dict[key] = dict()
category_dict[key]["messages"] = list()
# load undone from database
undone, = self._dispersy._database.execute(
u"SELECT undone FROM sync WHERE community = ? AND member = ? AND global_time = ?",
(self.database_id, message.payload.member.database_id, message.payload.global_time)).next()
if undone != 0:
try:
mid, global_time = self._dispersy._database.execute(
u"SELECT member.mid, sync.global_time FROM sync"
u" JOIN meta_message ON meta_message.id == sync.meta_message"
u" JOIN member ON member.id == sync.member"
u" WHERE meta_message.name == 'CancelMessage' AND sync.id == ?", (undone,)).next()
except StopIteration:
# no previous cancel for this message
pass
else:
category_dict[key]["db_undone"] = (undone, mid, global_time)
category_dict[key]["messages"].append(message)

# for each message to be canceled, check the cancel messages member mid and global_time
# (the larger the later), and we only use the latest one.
for key, item in category_dict.iteritems():
#all_item_list = [(msg.payload.member.mid, msg.payload.global_time, None) for msg in item["messages"]]
# TODO: check if this is correct. I assume that there are no duplicate messages here, but I am not sure.
# If there are no duplicate messages here, we can use the statement above instead.
all_item_list = list()
for msg in item["messages"]:
assert (msg.payload.member.mid, msg.payload.global_time, None) not in all_item_list,\
"There are duplicate cancel messages here!!!"
all_item_list.append((msg.distribution.global_time, msg.authentication.member.mid, None))

# add the
if item.get("db_undone", None):
assert (item["db_undone"][2], item["db_undone"][1], None) not in all_item_list,\
"There are duplicate cancel messages with the one in DB!!!"
db_item = (item["db_undone"][2], item["db_undone"][1], item["db_undone"][0])
all_item_list.append(db_item)

# sort and get the latest one
all_item_list.sort()
the_latest_one = all_item_list[-1]
if the_latest_one[2] is not None:
# The latest one is the one in the database. Drop all messages and send back the latest one.
try:
the_latest_packet = self._dispersy._database.execute(u"SELECT packet FROM sync WHERE id = ?",
(the_latest_one[2],)).next()
except StopIteration:
pass
else:
for msg in item["messages"]:
self._dispersy._send_packets([msg.candidate], [str(the_latest_packet)],
self, "-caused by -check_cancel-")

# drop all messages
for msg in item["messages"]:
drop = DropMessage(msg, "this cancel message is not the latest")
yield drop
else:
# yield the latest message and send it to other people.
the_latest_msg = None
for msg in item["messages"]:
if msg.distribution.global_time == the_latest_one[0] and \
msg.authentication.member.mid == the_latest_one[1]:
the_latest_msg = msg
break
for msg in item["messages"]:
if msg != the_latest_msg:
self._dispersy._send_packets([msg.candidate], [str(the_latest_msg.packet)],
self, "-caused by -check_cancel-")

yield the_latest_msg

def on_cancel(self, messages):
"""
Expand Down

0 comments on commit 4e5b7ef

Please sign in to comment.