From 9d911b5520ff040da750ae599b693aef18a29f4e Mon Sep 17 00:00:00 2001 From: Rob Ruigrok Date: Thu, 16 Apr 2015 13:42:16 +0200 Subject: [PATCH] Bugfixes and refactoring on circuit creation --- .../Core/Libtorrent/LibtorrentDownloadImpl.py | 14 +++++++++-- Tribler/Core/Libtorrent/LibtorrentMgr.py | 16 ------------- Tribler/Main/vwxGUI/home.py | 12 ++++++---- Tribler/Test/test_as_server.py | 2 ++ Tribler/Test/test_tunnel_community.py | 13 +++++++---- Tribler/community/tunnel/hidden_community.py | 23 ++++++++++--------- Tribler/community/tunnel/tunnel_community.py | 23 +++++++++++++++---- 7 files changed, 59 insertions(+), 44 deletions(-) diff --git a/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py b/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py index f9ede424227..c4e75f4acd4 100644 --- a/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py +++ b/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py @@ -266,7 +266,13 @@ def create_engine_wrapper(self, lm_network_engine_wrapper_created_callback, psta if not self.cew_scheduled: self.ltmgr = self.session.lm.ltmgr dht_ok = not isinstance(self.tdef, TorrentDefNoMetainfo) or self.ltmgr.is_dht_ready() - session_ok = self.ltmgr.tunnels_ready(self) == 1 + tunnel_community = self.ltmgr.trsession.lm.tunnel_community + if tunnel_community: + tunnels_ready = tunnel_community.tunnels_ready(self.get_hops(), self.get_def().is_anonymous()) + else: + tunnels_ready = 1 + + session_ok = tunnels_ready == 1 if not self.ltmgr or not dht_ok or not session_ok: self._logger.info(u"LTMGR/DHT/session not ready, rescheduling create_engine_wrapper") @@ -892,7 +898,11 @@ def network_get_state(self, usercallback, getpeerlist, sessioncalling=False): if self.dlstate != DLSTATUS_CIRCUITS: progress = self.progressbeforestop else: - progress = self.ltmgr.tunnels_ready(self) + tunnel_community = self.ltmgr.trsession.lm.tunnel_community + if tunnel_community: + progress = tunnel_community.tunnels_ready(self.get_hops(), self.get_def().is_anonymous()) + else: + progress = 1 ds = DownloadState(self, self.dlstate, self.error, progress) else: (status, stats, seeding_stats, logmsgs) = self.network_get_stats(getpeerlist) diff --git a/Tribler/Core/Libtorrent/LibtorrentMgr.py b/Tribler/Core/Libtorrent/LibtorrentMgr.py index 7317c94abcb..a9a2c2e85e2 100644 --- a/Tribler/Core/Libtorrent/LibtorrentMgr.py +++ b/Tribler/Core/Libtorrent/LibtorrentMgr.py @@ -126,22 +126,6 @@ def get_session(self, hops=0): return self.ltsessions[hops] - def tunnels_ready(self, download): - hops = download.get_hops() - if hops > 0: - tunnel_community = self.trsession.lm.tunnel_community - if tunnel_community: - if download.get_def().is_anonymous(): - current_hops = tunnel_community.circuits_needed.get(hops, 0) - tunnel_community.circuits_needed[hops] = max(1, current_hops) - return bool(tunnel_community.active_data_circuits(hops)) - else: - tunnel_community.circuits_needed[hops] = tunnel_community.settings.max_circuits - return min(1, len(tunnel_community.active_data_circuits(hops)) / - float(tunnel_community.settings.min_circuits)) - return 0 - return 1 - def shutdown(self): # Save DHT state dhtstate_file = open(os.path.join(self.trsession.get_state_dir(), DHTSTATE_FILENAME), 'w') diff --git a/Tribler/Main/vwxGUI/home.py b/Tribler/Main/vwxGUI/home.py index 80f76e19566..40d80d37e78 100644 --- a/Tribler/Main/vwxGUI/home.py +++ b/Tribler/Main/vwxGUI/home.py @@ -648,8 +648,8 @@ def AddComponents(self): self.circuit_list.InsertColumn(0, 'ID', wx.LIST_FORMAT_LEFT, 25) self.circuit_list.InsertColumn(1, 'Online', wx.LIST_FORMAT_RIGHT, 50) self.circuit_list.InsertColumn(2, 'Hops', wx.LIST_FORMAT_RIGHT, 45) - self.circuit_list.InsertColumn(3, u'Bytes \u2191', wx.LIST_FORMAT_RIGHT, 63) - self.circuit_list.InsertColumn(4, u'Bytes \u2193', wx.LIST_FORMAT_RIGHT, 63) + self.circuit_list.InsertColumn(3, u'Bytes \u2191', wx.LIST_FORMAT_RIGHT, 83) + self.circuit_list.InsertColumn(4, u'Bytes \u2193', wx.LIST_FORMAT_RIGHT, 83) self.circuit_list.InsertColumn(5, 'Uptime', wx.LIST_FORMAT_RIGHT, 54) self.circuit_list.setResizeColumn(0) self.circuit_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.OnItemSelected) @@ -660,7 +660,8 @@ def AddComponents(self): self.log_text = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.BORDER_SIMPLE | wx.HSCROLL & wx.VSCROLL) self.log_text.SetEditable(False) self.log_text.Show(self.fullscreen) - self.num_circuits_label = wx.StaticText(self, -1, "You have 0 circuit(s); 0 relay(s); 0 exit socket(s)") + self.num_circuits_label = wx.StaticText(self, -1, "You have 0 circuit(s); 0 relay(s); \ + 0 exit socket(s); 0 candidate(s)") self.vSizer = wx.BoxSizer(wx.VERTICAL) self.vSizer.Add(self.circuit_list, 1, wx.EXPAND | wx.RESERVE_SPACE_EVEN_IF_HIDDEN, 0) @@ -697,10 +698,11 @@ def OnUpdateCircuits(self, event): return if self.fullscreen: - self.num_circuits_label.SetLabel("You have %d circuit(s); %d relay(s); %d exit socket(s)" % + self.num_circuits_label.SetLabel("You have %d circuit(s); %d relay(s); %d exit socket(s); %d candidate(s)" % (len(self.tunnel_community.circuits), len(self.tunnel_community.relay_from_to), - len(self.tunnel_community.exit_sockets))) + len(self.tunnel_community.exit_sockets), + sum(1 for _ in self.tunnel_community.dispersy_yield_verified_candidates()))) new_circuits = dict(self.tunnel_community.circuits) self.circuits = {k: v for k, v in new_circuits.iteritems() if v.goal_hops == self.hops or self.hops < 0} diff --git a/Tribler/Test/test_as_server.py b/Tribler/Test/test_as_server.py index f681982a648..e899161aa0d 100644 --- a/Tribler/Test/test_as_server.py +++ b/Tribler/Test/test_as_server.py @@ -99,6 +99,7 @@ def tearDown(self, annotate=True): for dc in delayed_calls: self._logger.debug("> %s" % dc) self.assertFalse(delayed_calls, "The reactor was dirty when tearing down the test") + self.assertFalse(Session.has_instance(), 'A session instance is still present when tearing down the test') def tearDownCleanup(self): self.setUpCleanup() @@ -301,6 +302,7 @@ class TestGuiAsServer(TestAsServer): """ def setUp(self): + self.assertFalse(Session.has_instance(), 'A session instance is already present when setting up the test') AbstractServer.setUp(self, annotate=False) self.app = wx.GetApp() diff --git a/Tribler/Test/test_tunnel_community.py b/Tribler/Test/test_tunnel_community.py index 9510537f3ee..04d5ac2bf40 100644 --- a/Tribler/Test/test_tunnel_community.py +++ b/Tribler/Test/test_tunnel_community.py @@ -13,6 +13,7 @@ from Tribler.dispersy.util import blockingCallFromThread from Tribler.community.tunnel.tunnel_community import TunnelSettings from Tribler.community.tunnel.hidden_community import HiddenTunnelCommunity +from Tribler.dispersy.crypto import NoCrypto class TestTunnelCommunity(TestGuiAsServer): @@ -328,9 +329,9 @@ def cb_dht(info_hash, peers, source): self.CallConditional(60, dht.is_set, lambda: self.Call(5, lambda: start_download(tf)), 'Introduction point did not get announced') - self.startTest(setup_seeder) + self.startTest(setup_seeder, nr_relays=6, nr_exitnodes=4) - def startTest(self, callback, min_timeout=5, nr_relays=5, nr_exitnodes=3): + def startTest(self, callback, min_timeout=5, nr_relays=5, nr_exitnodes=3, crypto_enabled=True): from Tribler.Main import tribler_main tribler_main.FORCE_ENABLE_TUNNEL_COMMUNITY = True tribler_main.TUNNEL_COMMUNITY_DO_TEST = False @@ -341,11 +342,11 @@ def setup_proxies(): tunnel_communities = [] baseindex = 3 for i in range(baseindex, baseindex + nr_relays): # Normal relays - tunnel_communities.append(create_proxy(i, False)) + tunnel_communities.append(create_proxy(i, False, crypto_enabled)) baseindex += nr_relays + 1 for i in range(baseindex, baseindex + nr_exitnodes): # Exit nodes - tunnel_communities.append(create_proxy(i, True)) + tunnel_communities.append(create_proxy(i, True, crypto_enabled)) # Connect the proxies to the Tribler instance for community in self.lm.dispersy.get_communities(): @@ -367,7 +368,7 @@ def setup_proxies(): callback(tunnel_communities) - def create_proxy(index, become_exit_node): + def create_proxy(index, become_exit_node, crypto_enabled): from Tribler.Core.Session import Session self.setUpPreSession() @@ -393,6 +394,8 @@ def load_community(session): dispersy_member = dispersy.get_member(private_key=dispersy.crypto.key_to_bin(keypair)) settings = TunnelSettings(tribler_session=session) settings.do_test = False + if not crypto_enabled: + settings.crypto = NoCrypto() settings.become_exitnode = become_exit_node return dispersy.define_auto_load(HiddenTunnelCommunity, dispersy_member, (session, settings), load=True)[0] diff --git a/Tribler/community/tunnel/hidden_community.py b/Tribler/community/tunnel/hidden_community.py index 4f262843010..4d2513ef8fa 100644 --- a/Tribler/community/tunnel/hidden_community.py +++ b/Tribler/community/tunnel/hidden_community.py @@ -15,7 +15,7 @@ from Tribler.community.tunnel.payload import ( EstablishIntroPayload, IntroEstablishedPayload, EstablishRendezvousPayload, - RendezvousEstablishedPayload, KeyResponsePayload, KeyRequestPayload, + RendezvousEstablishedPayload, KeyResponsePayload, KeyRequestPayload, CreateE2EPayload, CreatedE2EPayload, LinkE2EPayload, LinkedE2EPayload) from Tribler.community.tunnel.routing import RelayRoute, RendezvousPoint, Hop @@ -51,8 +51,9 @@ def __init__(self, community, rp): self.rp = rp def on_timeout(self): - self._logger.debug("RPRequestCache: no response on establish-rendezvous (circuit %d)", self.rp.circuit_id) - self.community.remove_circuit(self.circuit.circuit_id, 'establish-rendezvous timeout') + self._logger.debug("RPRequestCache: no response on establish-rendezvous (circuit %d)", + self.rp.circuit.circuit_id) + self.community.remove_circuit(self.rp.circuit.circuit_id, 'establish-rendezvous timeout') class KeyRequestCache(RandomNumberCache): @@ -251,11 +252,11 @@ def check_key_request(self, messages): for message in messages: info_hash = message.payload.info_hash if not message.source.startswith(u"circuit_"): - if not info_hash in self.intro_point_for: + if info_hash not in self.intro_point_for: yield DropMessage(message, "not an intro point for this infohash") continue else: - if not info_hash in self.session_keys: + if info_hash not in self.session_keys: yield DropMessage(message, "not seeding this infohash") continue @@ -314,18 +315,18 @@ def on_create_e2e(self, messages): relay_circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + message.packet) else: - self.create_rendevous_point( - DEFAULT_HOPS, lambda rendevous_point, message=message: self.create_created_e2e(rendevous_point, message)) + self.create_rendezvous_point( + DEFAULT_HOPS, lambda rendezvous_point, message=message: self.create_created_e2e(rendezvous_point, message)) - def create_created_e2e(self, rendevous_point, message): + def create_created_e2e(self, rendezvous_point, message): info_hash = message.payload.info_hash key = self.session_keys[info_hash] circuit = self.circuits[int(message.source[8:])] shared_secret, Y, AUTH = self.crypto.generate_diffie_shared_secret(message.payload.key, key) - rendevous_point.circuit.hs_session_keys = self.crypto.generate_session_keys(shared_secret) + rendezvous_point.circuit.hs_session_keys = self.crypto.generate_session_keys(shared_secret) rp_info_enc = self.crypto.encrypt_str( - encode((rendevous_point.rp_info, rendevous_point.cookie)), *self.get_session_keys(rendevous_point.circuit.hs_session_keys, EXIT_NODE)) + encode((rendezvous_point.rp_info, rendezvous_point.cookie)), *self.get_session_keys(rendezvous_point.circuit.hs_session_keys, EXIT_NODE)) meta = self.get_meta_message(u'created-e2e') response = meta.impl(distribution=(self.global_time,), payload=( @@ -477,7 +478,7 @@ def on_intro_established(self, messages): self.request_cache.pop(u"establish-intro", message.payload.identifier) self._logger.info("Got intro-established from %s", message.candidate) - def create_rendevous_point(self, hops, finished_callback): + def create_rendezvous_point(self, hops, finished_callback): def callback(circuit): # We got a circuit, now let's create a rendezvous point circuit_id = circuit.circuit_id diff --git a/Tribler/community/tunnel/tunnel_community.py b/Tribler/community/tunnel/tunnel_community.py index 2a6b80213b3..1a58ff0e2d6 100644 --- a/Tribler/community/tunnel/tunnel_community.py +++ b/Tribler/community/tunnel/tunnel_community.py @@ -39,6 +39,7 @@ from Tribler.dispersy.util import call_on_reactor_thread from Tribler.dispersy.requestcache import NumberCache, RandomNumberCache from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics +from Tribler.Core.CacheDB.sqlitecachedb import bin2str class CircuitRequestCache(NumberCache): @@ -413,6 +414,17 @@ def do_circuits(self): self.do_remove() + def tunnels_ready(self, hops, anonymous): + if hops > 0: + if anonymous: + current_hops = self.circuits_needed.get(hops, 0) + self.circuits_needed[hops] = max(1, current_hops) + return bool(self.active_data_circuits(hops)) + else: + self.circuits_needed[hops] = self.settings.max_circuits + return min(1, len(self.active_data_circuits(hops)) / float(self.settings.min_circuits)) + return 1 + def do_remove(self): # Remove circuits that are inactive / are too old / have transferred too many bytes. for key, circuit in self.circuits.items(): @@ -464,17 +476,17 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, max_ break if not required_exit: - self._logger.debug("Look for connectable exit node to set as required_exit for this circuit") + self._logger.debug("Look for exit node to set as required_exit for this circuit") # Each circuit's exit node should be a verified connectable exit node peer chosen by the circuit initiator for c in self.dispersy_yield_verified_candidates(): pubkey = c.get_member().public_key exit_candidate = self.exit_candidates[pubkey] - if exit_candidate.become_exit and self.candidate_is_connectable(c): - self._logger.debug("Valid exit node found for this circuit") + if exit_candidate.become_exit: + self._logger.debug("Valid exit candidate found for this circuit") required_exit = (c.sock_addr[0], c.sock_addr[1], pubkey) # Stop looking for a better alternative if the exit-node is not used for exiting in another circuit - if c.sock_addr not in hops: - self._logger.debug("Exit node not used in other circuits, best choice") + if c.sock_addr not in hops and self.candidate_is_connectable(c): + self._logger.debug("Exit node is connectable and not used in other circuits, that's prefered") break if not required_exit: @@ -1175,6 +1187,7 @@ def crypto_in(self, circuit_id, content, is_data=False): if circuit and len(circuit.hops) > 0: # Remove all the encryption layers for hop in self.circuits[circuit_id].hops: + self._logger.debug("Decrypting encryption layer for hop %s in circuit %s" % (hop.sock_addr, circuit_id)) content = self.crypto.decrypt_str( content, hop.session_keys[ORIGINATOR], hop.session_keys[ORIGINATOR_SALT]) if circuit and is_data and circuit.ctype in [CIRCUIT_TYPE_RENDEZVOUS, CIRCUIT_TYPE_RP]: