Skip to content

Commit

Permalink
Bugfixes and refactoring on circuit creation
Browse files Browse the repository at this point in the history
  • Loading branch information
rjruigrok committed Apr 18, 2015
1 parent 0fb9c08 commit b3439a4
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 44 deletions.
14 changes: 12 additions & 2 deletions Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,13 @@ def create_engine_wrapper(self, lm_network_engine_wrapper_created_callback, psta
if not self.cew_scheduled:
self.ltmgr = self.session.lm.ltmgr
dht_ok = not isinstance(self.tdef, TorrentDefNoMetainfo) or self.ltmgr.is_dht_ready()
session_ok = self.ltmgr.tunnels_ready(self) == 1
tunnel_community = self.ltmgr.trsession.lm.tunnel_community
if tunnel_community:
tunnels_ready = tunnel_community.tunnels_ready(self.get_hops(), self.get_def().is_anonymous())
else:
tunnels_ready = 1

session_ok = tunnels_ready == 1

if not self.ltmgr or not dht_ok or not session_ok:
self._logger.info(u"LTMGR/DHT/session not ready, rescheduling create_engine_wrapper")
Expand Down Expand Up @@ -892,7 +898,11 @@ def network_get_state(self, usercallback, getpeerlist, sessioncalling=False):
if self.dlstate != DLSTATUS_CIRCUITS:
progress = self.progressbeforestop
else:
progress = self.ltmgr.tunnels_ready(self)
tunnel_community = self.ltmgr.trsession.lm.tunnel_community
if tunnel_community:
progress = tunnel_community.tunnels_ready(self.get_hops(), self.get_def().is_anonymous())
else:
progress = 1
ds = DownloadState(self, self.dlstate, self.error, progress)
else:
(status, stats, seeding_stats, logmsgs) = self.network_get_stats(getpeerlist)
Expand Down
16 changes: 0 additions & 16 deletions Tribler/Core/Libtorrent/LibtorrentMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,6 @@ def get_session(self, hops=0):

return self.ltsessions[hops]

def tunnels_ready(self, download):
hops = download.get_hops()
if hops > 0:
tunnel_community = self.trsession.lm.tunnel_community
if tunnel_community:
if download.get_def().is_anonymous():
current_hops = tunnel_community.circuits_needed.get(hops, 0)
tunnel_community.circuits_needed[hops] = max(1, current_hops)
return bool(tunnel_community.active_data_circuits(hops))
else:
tunnel_community.circuits_needed[hops] = tunnel_community.settings.max_circuits
return min(1, len(tunnel_community.active_data_circuits(hops)) /
float(tunnel_community.settings.min_circuits))
return 0
return 1

def shutdown(self):
# Save DHT state
dhtstate_file = open(os.path.join(self.trsession.get_state_dir(), DHTSTATE_FILENAME), 'w')
Expand Down
12 changes: 7 additions & 5 deletions Tribler/Main/vwxGUI/home.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,8 @@ def AddComponents(self):
self.circuit_list.InsertColumn(0, 'ID', wx.LIST_FORMAT_LEFT, 25)
self.circuit_list.InsertColumn(1, 'Online', wx.LIST_FORMAT_RIGHT, 50)
self.circuit_list.InsertColumn(2, 'Hops', wx.LIST_FORMAT_RIGHT, 45)
self.circuit_list.InsertColumn(3, u'Bytes \u2191', wx.LIST_FORMAT_RIGHT, 63)
self.circuit_list.InsertColumn(4, u'Bytes \u2193', wx.LIST_FORMAT_RIGHT, 63)
self.circuit_list.InsertColumn(3, u'Bytes \u2191', wx.LIST_FORMAT_RIGHT, 83)
self.circuit_list.InsertColumn(4, u'Bytes \u2193', wx.LIST_FORMAT_RIGHT, 83)
self.circuit_list.InsertColumn(5, 'Uptime', wx.LIST_FORMAT_RIGHT, 54)
self.circuit_list.setResizeColumn(0)
self.circuit_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.OnItemSelected)
Expand All @@ -660,7 +660,8 @@ def AddComponents(self):
self.log_text = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.BORDER_SIMPLE | wx.HSCROLL & wx.VSCROLL)
self.log_text.SetEditable(False)
self.log_text.Show(self.fullscreen)
self.num_circuits_label = wx.StaticText(self, -1, "You have 0 circuit(s); 0 relay(s); 0 exit socket(s)")
self.num_circuits_label = wx.StaticText(self, -1, "You have 0 circuit(s); 0 relay(s); \
0 exit socket(s); 0 candidate(s)")

self.vSizer = wx.BoxSizer(wx.VERTICAL)
self.vSizer.Add(self.circuit_list, 1, wx.EXPAND | wx.RESERVE_SPACE_EVEN_IF_HIDDEN, 0)
Expand Down Expand Up @@ -697,10 +698,11 @@ def OnUpdateCircuits(self, event):
return

if self.fullscreen:
self.num_circuits_label.SetLabel("You have %d circuit(s); %d relay(s); %d exit socket(s)" %
self.num_circuits_label.SetLabel("You have %d circuit(s); %d relay(s); %d exit socket(s); %d candidate(s)" %
(len(self.tunnel_community.circuits),
len(self.tunnel_community.relay_from_to),
len(self.tunnel_community.exit_sockets)))
len(self.tunnel_community.exit_sockets),
sum(1 for _ in self.tunnel_community.dispersy_yield_verified_candidates())))

new_circuits = dict(self.tunnel_community.circuits)
self.circuits = {k: v for k, v in new_circuits.iteritems() if v.goal_hops == self.hops or self.hops < 0}
Expand Down
2 changes: 2 additions & 0 deletions Tribler/Test/test_as_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def tearDown(self, annotate=True):
for dc in delayed_calls:
self._logger.debug("> %s" % dc)
self.assertFalse(delayed_calls, "The reactor was dirty when tearing down the test")
self.assertFalse(Session.has_instance(), 'A session instance is still present when tearing down the test')

def tearDownCleanup(self):
self.setUpCleanup()
Expand Down Expand Up @@ -301,6 +302,7 @@ class TestGuiAsServer(TestAsServer):
"""

def setUp(self):
self.assertFalse(Session.has_instance(), 'A session instance is already present when setting up the test')
AbstractServer.setUp(self, annotate=False)

self.app = wx.GetApp()
Expand Down
13 changes: 8 additions & 5 deletions Tribler/Test/test_tunnel_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from Tribler.dispersy.util import blockingCallFromThread
from Tribler.community.tunnel.tunnel_community import TunnelSettings
from Tribler.community.tunnel.hidden_community import HiddenTunnelCommunity
from Tribler.dispersy.crypto import NoCrypto


class TestTunnelCommunity(TestGuiAsServer):
Expand Down Expand Up @@ -328,9 +329,9 @@ def cb_dht(info_hash, peers, source):
self.CallConditional(60, dht.is_set, lambda: self.Call(5, lambda: start_download(tf)),
'Introduction point did not get announced')

self.startTest(setup_seeder)
self.startTest(setup_seeder, nr_relays=6, nr_exitnodes=4)

def startTest(self, callback, min_timeout=5, nr_relays=5, nr_exitnodes=3):
def startTest(self, callback, min_timeout=5, nr_relays=5, nr_exitnodes=3, crypto_enabled=True):
from Tribler.Main import tribler_main
tribler_main.FORCE_ENABLE_TUNNEL_COMMUNITY = True
tribler_main.TUNNEL_COMMUNITY_DO_TEST = False
Expand All @@ -341,11 +342,11 @@ def setup_proxies():
tunnel_communities = []
baseindex = 3
for i in range(baseindex, baseindex + nr_relays): # Normal relays
tunnel_communities.append(create_proxy(i, False))
tunnel_communities.append(create_proxy(i, False, crypto_enabled))

baseindex += nr_relays + 1
for i in range(baseindex, baseindex + nr_exitnodes): # Exit nodes
tunnel_communities.append(create_proxy(i, True))
tunnel_communities.append(create_proxy(i, True, crypto_enabled))

# Connect the proxies to the Tribler instance
for community in self.lm.dispersy.get_communities():
Expand All @@ -367,7 +368,7 @@ def setup_proxies():

callback(tunnel_communities)

def create_proxy(index, become_exit_node):
def create_proxy(index, become_exit_node, crypto_enabled):
from Tribler.Core.Session import Session

self.setUpPreSession()
Expand All @@ -393,6 +394,8 @@ def load_community(session):
dispersy_member = dispersy.get_member(private_key=dispersy.crypto.key_to_bin(keypair))
settings = TunnelSettings(tribler_session=session)
settings.do_test = False
if not crypto_enabled:
settings.crypto = NoCrypto()
settings.become_exitnode = become_exit_node
return dispersy.define_auto_load(HiddenTunnelCommunity, dispersy_member, (session, settings), load=True)[0]

Expand Down
23 changes: 12 additions & 11 deletions Tribler/community/tunnel/hidden_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from Tribler.community.tunnel.payload import (
EstablishIntroPayload, IntroEstablishedPayload, EstablishRendezvousPayload,
RendezvousEstablishedPayload, KeyResponsePayload, KeyRequestPayload,
RendezvousEstablishedPayload, KeyResponsePayload, KeyRequestPayload,
CreateE2EPayload, CreatedE2EPayload, LinkE2EPayload, LinkedE2EPayload)
from Tribler.community.tunnel.routing import RelayRoute, RendezvousPoint, Hop

Expand Down Expand Up @@ -51,8 +51,9 @@ def __init__(self, community, rp):
self.rp = rp

def on_timeout(self):
self._logger.debug("RPRequestCache: no response on establish-rendezvous (circuit %d)", self.rp.circuit_id)
self.community.remove_circuit(self.circuit.circuit_id, 'establish-rendezvous timeout')
self._logger.debug("RPRequestCache: no response on establish-rendezvous (circuit %d)",
self.rp.circuit.circuit_id)
self.community.remove_circuit(self.rp.circuit.circuit_id, 'establish-rendezvous timeout')


class KeyRequestCache(RandomNumberCache):
Expand Down Expand Up @@ -251,11 +252,11 @@ def check_key_request(self, messages):
for message in messages:
info_hash = message.payload.info_hash
if not message.source.startswith(u"circuit_"):
if not info_hash in self.intro_point_for:
if info_hash not in self.intro_point_for:
yield DropMessage(message, "not an intro point for this infohash")
continue
else:
if not info_hash in self.session_keys:
if info_hash not in self.session_keys:
yield DropMessage(message, "not seeding this infohash")
continue

Expand Down Expand Up @@ -314,18 +315,18 @@ def on_create_e2e(self, messages):
relay_circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + message.packet)

else:
self.create_rendevous_point(
DEFAULT_HOPS, lambda rendevous_point, message=message: self.create_created_e2e(rendevous_point, message))
self.create_rendezvous_point(
DEFAULT_HOPS, lambda rendezvous_point, message=message: self.create_created_e2e(rendezvous_point, message))

def create_created_e2e(self, rendevous_point, message):
def create_created_e2e(self, rendezvous_point, message):
info_hash = message.payload.info_hash
key = self.session_keys[info_hash]

circuit = self.circuits[int(message.source[8:])]
shared_secret, Y, AUTH = self.crypto.generate_diffie_shared_secret(message.payload.key, key)
rendevous_point.circuit.hs_session_keys = self.crypto.generate_session_keys(shared_secret)
rendezvous_point.circuit.hs_session_keys = self.crypto.generate_session_keys(shared_secret)
rp_info_enc = self.crypto.encrypt_str(
encode((rendevous_point.rp_info, rendevous_point.cookie)), *self.get_session_keys(rendevous_point.circuit.hs_session_keys, EXIT_NODE))
encode((rendezvous_point.rp_info, rendezvous_point.cookie)), *self.get_session_keys(rendezvous_point.circuit.hs_session_keys, EXIT_NODE))

meta = self.get_meta_message(u'created-e2e')
response = meta.impl(distribution=(self.global_time,), payload=(
Expand Down Expand Up @@ -477,7 +478,7 @@ def on_intro_established(self, messages):
self.request_cache.pop(u"establish-intro", message.payload.identifier)
self._logger.info("Got intro-established from %s", message.candidate)

def create_rendevous_point(self, hops, finished_callback):
def create_rendezvous_point(self, hops, finished_callback):
def callback(circuit):
# We got a circuit, now let's create a rendezvous point
circuit_id = circuit.circuit_id
Expand Down
23 changes: 18 additions & 5 deletions Tribler/community/tunnel/tunnel_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from Tribler.dispersy.util import call_on_reactor_thread
from Tribler.dispersy.requestcache import NumberCache, RandomNumberCache
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics
from Tribler.Core.CacheDB.sqlitecachedb import bin2str


class CircuitRequestCache(NumberCache):
Expand Down Expand Up @@ -413,6 +414,17 @@ def do_circuits(self):

self.do_remove()

def tunnels_ready(self, hops, anonymous):
if hops > 0:
if anonymous:
current_hops = self.circuits_needed.get(hops, 0)
self.circuits_needed[hops] = max(1, current_hops)
return bool(self.active_data_circuits(hops))
else:
self.circuits_needed[hops] = self.settings.max_circuits
return min(1, len(self.active_data_circuits(hops)) / float(self.settings.min_circuits))
return 1

def do_remove(self):
# Remove circuits that are inactive / are too old / have transferred too many bytes.
for key, circuit in self.circuits.items():
Expand Down Expand Up @@ -464,17 +476,17 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, max_
break

if not required_exit:
self._logger.debug("Look for connectable exit node to set as required_exit for this circuit")
self._logger.debug("Look for exit node to set as required_exit for this circuit")
# Each circuit's exit node should be a verified connectable exit node peer chosen by the circuit initiator
for c in self.dispersy_yield_verified_candidates():
pubkey = c.get_member().public_key
exit_candidate = self.exit_candidates[pubkey]
if exit_candidate.become_exit and self.candidate_is_connectable(c):
self._logger.debug("Valid exit node found for this circuit")
if exit_candidate.become_exit:
self._logger.debug("Valid exit candidate found for this circuit")
required_exit = (c.sock_addr[0], c.sock_addr[1], pubkey)
# Stop looking for a better alternative if the exit-node is not used for exiting in another circuit
if c.sock_addr not in hops:
self._logger.debug("Exit node not used in other circuits, best choice")
if c.sock_addr not in hops and self.candidate_is_connectable(c):
self._logger.debug("Exit node is connectable and not used in other circuits, that's prefered")
break

if not required_exit:
Expand Down Expand Up @@ -1175,6 +1187,7 @@ def crypto_in(self, circuit_id, content, is_data=False):
if circuit and len(circuit.hops) > 0:
# Remove all the encryption layers
for hop in self.circuits[circuit_id].hops:
self._logger.debug("Decrypting encryption layer for hop %s in circuit %s" % (hop.address, circuit_id))
content = self.crypto.decrypt_str(
content, hop.session_keys[ORIGINATOR], hop.session_keys[ORIGINATOR_SALT])
if circuit and is_data and circuit.ctype in [CIRCUIT_TYPE_RENDEZVOUS, CIRCUIT_TYPE_RP]:
Expand Down

0 comments on commit b3439a4

Please sign in to comment.