Skip to content

Commit

Permalink
Merge pull request #140 from lbryio/reflect-blobs
Browse files Browse the repository at this point in the history
reflect_all_blobs daemon command
jackrobison authored Aug 28, 2016
2 parents 319e7ec + aa66af7 commit 25c71bc
Showing 5 changed files with 224 additions and 3 deletions.
35 changes: 34 additions & 1 deletion lbrynet/lbrynet_daemon/LBRYDaemon.py
Original file line number Diff line number Diff line change
@@ -1368,6 +1368,24 @@ def _reflect(self, lbry_file):
d.addCallback(lambda _: factory.finished_deferred)
return d

def _reflect_blobs(self, blob_hashes):
if not blob_hashes:
return defer.fail(Exception("no lbry file given to reflect"))

log.info("Reflecting %i blobs" % len(blob_hashes))

reflector_server = random.choice(REFLECTOR_SERVERS)
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
log.info("Start reflector client")
factory = reflector.BlobClientFactory(
self.session.blob_manager,
blob_hashes
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d

def _log_to_slack(self, msg):
URL = "https://hooks.slack.com/services/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"
msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg
@@ -2443,7 +2461,7 @@ def jsonrpc_reflect(self, p):
Reflect a stream
Args:
sd_hash
sd_hash: sd_hash of lbry file
Returns:
True or traceback
"""
@@ -2468,6 +2486,21 @@ def jsonrpc_get_blob_hashes(self):
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d

def jsonrpc_reflect_all_blobs(self):
"""
Reflects all saved blobs
Args:
None
Returns:
True
"""

d = self.session.blob_manager.get_all_verified_blobs()
d.addCallback(self._reflect_blobs)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d


def get_lbrynet_version_from_github():
"""Return the latest released version from github."""
1 change: 1 addition & 0 deletions lbrynet/reflector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory as ClientFactory
from lbrynet.reflector.client.client import LBRYBlobReflectorClientFactory as BlobClientFactory
185 changes: 185 additions & 0 deletions lbrynet/reflector/client/client.py
Original file line number Diff line number Diff line change
@@ -267,3 +267,188 @@ def clientConnectionLost(self, connector, reason):

def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason)


class LBRYBlobReflectorClient(Protocol):
# Protocol stuff

def connectionMade(self):
self.blob_manager = self.factory.blob_manager
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = self.factory.blobs
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = None
self.file_sender = None
self.producer = None
self.streaming = False
d = self.send_handshake()
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))

def dataReceived(self, data):
log.debug('Recieved %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponseError:
pass
else:
self.response_buff = ''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)

def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
log.debug('Finished sending data via reflector')
self.factory.finished_deferred.callback(True)
else:
log.debug('reflector finished: %s', reason)
self.factory.finished_deferred.callback(reason)

# IConsumer stuff

def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)

def unregisterProducer(self):
self.producer = None

def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)

def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
return defer.succeed(None)

def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponseError()

def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())

def handle_response(self, response_dict):
if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict)
else:
return self.handle_normal_response(response_dict)

def set_not_uploading(self):
if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
self.file_sender = None
return defer.succeed(None)

def start_transfer(self):
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d

def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
return defer.succeed(True)

def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
return self.set_not_uploading()

def open_blob_for_reading(self, blob):
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))

def send_blob_info(self):
log.info("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info')
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}))

def send_next_request(self):
if self.file_sender is not None:
# send the blob
log.debug('Sending the blob')
return self.start_transfer()
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
log.debug('No current blob, sending the next one: %s', blob_hash)
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length
d.addCallback(lambda _: self.send_blob_info())
return d
else:
# close connection
log.debug('No more blob hashes, closing connection')
self.transport.loseConnection()


class LBRYBlobReflectorClientFactory(ClientFactory):
protocol = LBRYBlobReflectorClient

def __init__(self, blob_manager, blobs):
self.blob_manager = blob_manager
self.blobs = blobs
self.p = None
self.finished_deferred = defer.Deferred()

def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
return p

def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)

def startedConnecting(self, connector):
log.debug('Started connecting')

def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
log.debug("connection lost: %s", reason)

def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason)
1 change: 0 additions & 1 deletion lbrynet/reflector/server/server.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
import json

from lbrynet.core.utils import is_valid_blobhash


5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,10 @@
requires = ['pycrypto', 'twisted', 'miniupnpc', 'yapsy', 'seccure',
'python-bitcoinrpc==0.1', 'txJSON-RPC', 'requests>=2.4.2', 'unqlite==0.2.0',
'leveldb', 'lbryum', 'jsonrpc', 'simplejson', 'appdirs', 'six==1.9.0', 'base58', 'googlefinance',
'requests_futures', 'service_identity']
'requests_futures']

if sys.platform.startswith("linux"):
requires.append('service-identity')

setup(name='lbrynet',
description='A decentralized media library and marketplace',

0 comments on commit 25c71bc

Please sign in to comment.