Skip to content

Commit

Permalink
support concurrently upload and download (#5)
Browse files Browse the repository at this point in the history
* support concurrently upload and download
* 修正 README.md 和  setup.py
* fix pep8 warning
  • Loading branch information
liuchang0812 authored Oct 9, 2016
1 parent f8dc04f commit d125f5c
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 71 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ python setup.py install
appid = 100000 # 替换为用户的appid
secret_id = u'xxxxxxxx' # 替换为用户的secret_id
secret_key = u'xxxxxxx' # 替换为用户的secret_key
cos_client = CosClient(appid, secret_id, secret_key)
region = "shanghai" # # 替换为用户的region,目前可以为 shanghai/guangzhou
cos_client = CosClient(appid, secret_id, secret_key, region)

# 设置要操作的bucket
bucket = u'mybucket'
Expand Down
2 changes: 2 additions & 0 deletions qcloud_cos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
from .cos_request import StatFileRequest
from .cos_request import StatFolderRequest
from .cos_request import ListFolderRequest
from .cos_request import DownloadFileRequest

from .cos_auth import Auth
from .cos_cred import CredInfo
9 changes: 7 additions & 2 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cos_request import StatFolderRequest
from cos_request import StatFileRequest
from cos_request import ListFolderRequest
from cos_request import DownloadFileRequest
try:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
Expand All @@ -26,15 +27,15 @@
class CosClient(object):
"""Cos客户端类"""

def __init__(self, appid, secret_id, secret_key):
def __init__(self, appid, secret_id, secret_key, region="shanghai"):
""" 设置用户的相关信息
:param appid: appid
:param secret_id: secret_id
:param secret_key: secret_key
"""
self._cred = CredInfo(appid, secret_id, secret_key)
self._config = CosConfig()
self._config = CosConfig(region=region)
self._http_session = requests.session()
self._file_op = FileOp(self._cred, self._config, self._http_session)
self._folder_op = FolderOp(self._cred, self._config, self._http_session)
Expand Down Expand Up @@ -122,6 +123,10 @@ def update_file(self, request):
assert isinstance(request, UpdateFileRequest)
return self._file_op.update_file(request)

def download_file(self, request):
assert isinstance(request, DownloadFileRequest)
return self._file_op.download_file(request)

def create_folder(self, request):
"""创建目录
Expand Down
82 changes: 61 additions & 21 deletions qcloud_cos/cos_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,62 @@
# -*- coding: utf-8 -*-


class CosConfig(object):
"""CosConfig 有关cos的配置"""
class CosRegionInfo(object):

def __init__(self):
self._end_point = 'http://sh.file.myqcloud.com/files/v2'
self._user_agent = 'cos-python-sdk-v4'
self._timeout = 3
self._sign_expired = 300
def __init__(self, region=None, hostname=None, download_hostname=None, *args, **kwargs):
self._hostname = None
self._download_hostname = None

def set_end_point(self, end_point):
"""设置COS的域名地址
if region == 'shanghai':
self._hostname = 'sh.file.myqcloud.com'
self._download_hostname = 'cossh.myqcloud.com'

elif region == 'guangzhou':
self._hostname = 'gz.file.myqcloud.com"'
self._download_hostname = 'cosgz.myqcloud.com"'

else:
if hostname and download_hostname:
self._hostname = hostname
self._download_hostname = download_hostname
else:
raise ValueError("region or [hostname, download_hostname] must be set, and region should be shanghai/guangzhou")

@property
def hostname(self):
assert self._hostname is not None
return self._hostname

@property
def download_hostname(self):
assert self._download_hostname is not None
return self._download_hostname

:param end_point:
:return:
"""
self._end_point = end_point

def get_end_point(self):
class CosConfig(object):
"""CosConfig 有关cos的配置"""

def __init__(self, timeout=300, sign_expired=300, enable_https=True, *args, **kwargs):
self._region = CosRegionInfo(*args, **kwargs)
self._user_agent = 'cos-python-sdk-v4'
self._timeout = timeout
self._sign_expired = sign_expired
self._enable_https = enable_https
if self._enable_https:
self._protocol = "https"
else:
self._protocol = "http"

def get_endpoint(self):
"""获取域名地址
:return:
"""
return self._end_point
# tmpl = "%s://%s/files/v2"
return self._protocol + "://" + self._region.hostname + "/files/v2"

def get_download_hostname(self):
return self._region.download_hostname

def get_user_agent(self):
"""获取HTTP头中的user_agent
Expand Down Expand Up @@ -65,10 +98,17 @@ def get_sign_expired(self):
"""
return self._sign_expired

@property
def enable_https(self):
"""打开https
TODO(lc): There is a bug if use had invoked SetEndpoint
:return:
"""
self._end_point = 'https://sh.file.myqcloud.com/files/v2'
assert self._enable_https is not None
return self._enable_https

@enable_https.setter
def enable_https(self, val):
if val != self._enable_https:
if val:
self._enable_https = val
self._protocol = "https"
else:
self._enable_https = val
self._protocol = "http"
137 changes: 94 additions & 43 deletions qcloud_cos/cos_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from cos_request import UpdateFolderRequest
from cos_request import StatFolderRequest
from cos_request import DelFolderRequest
from cos_request import ListFolderRequest
from cos_request import ListFolderRequest, DownloadFileRequest
from cos_common import Sha1Util

from logging import getLogger
Expand Down Expand Up @@ -67,12 +67,21 @@ def _build_url(self, bucket, cos_path):
:return:
"""
bucket = bucket.encode('utf8')
end_point = self._config.get_end_point().rstrip('/').encode('utf8')
end_point = self._config.get_endpoint().rstrip('/').encode('utf8')
appid = self._cred.get_appid()
cos_path = urllib.quote(cos_path.encode('utf8'), '~/')
url = '%s/%s/%s%s' % (end_point, appid, bucket, cos_path)
return url

def build_download_url(self, bucket, cos_path, sign):
# Only support http now
appid = self._cred.get_appid()
hostname = self._config.get_download_hostname()
cos_path = urllib.quote(cos_path)
url_tmpl = 'http://{bucket}-{appid}.{hostname}{cos_path}?sign={sign}'

return url_tmpl.format(bucket=bucket, appid=appid, hostname=hostname, cos_path=cos_path, sign=sign)

def send_request(self, method, bucket, cos_path, **kwargs):
""" 发送http请求
Expand All @@ -84,6 +93,7 @@ def send_request(self, method, bucket, cos_path, **kwargs):
"""
url = self._build_url(bucket, cos_path)
logger.debug("sending request, method: %s, bucket: %s, cos_path: %s" % (method, bucket, cos_path))

try:
if method == 'POST':
http_resp = self._http_session.post(url, verify=False, **kwargs)
Expand Down Expand Up @@ -332,10 +342,13 @@ def upload_slice_file(self, request):

if enable_sha1 is True:
sha1_by_slice_list = Sha1Util.get_sha1_by_slice(local_path, slice_size)
request.sha1_list = sha1_by_slice_list
request.sha1_content = sha1_by_slice_list[-1]["datasha"]
else:
sha1_by_slice_list = None
request.sha1_list = None
request.sha1_content = None

control_ret = self._upload_slice_control(request, sha1_by_slice_list)
control_ret = self._upload_slice_control(request)

# 表示控制分片已经产生错误信息
if control_ret[u'code'] != 0:
Expand All @@ -345,50 +358,56 @@ def upload_slice_file(self, request):
if u'access_url' in control_ret[u'data']:
return control_ret

bucket = request.get_bucket_name()
cos_path = request.get_cos_path()
local_path = request.get_local_path()
file_size = os.path.getsize(local_path)
slice_size = control_ret[u'data'][u'slice_size']
offset = 0
session = control_ret[u'data'][u'session']
# ?concurrency
if request._max_con <= 1 or (u'serial_upload' in control_ret[u'data'] and control_ret[u'data'][u'serial_upload'] == 1):

logger.info("upload file serially")
slice_idx = 0
with open(local_path, 'rb') as local_file:

while offset < file_size:
file_content = local_file.read(slice_size)

data_ret = self._upload_slice_data(request, file_content, session, offset)

slice_idx = 0
with open(local_path, 'rb') as local_file:

while offset < file_size:
file_content = local_file.read(slice_size)
retry_count = 0
max_retry = 3
# 如果分片数据上传发生错误, 则进行重试,默认3次
while retry_count < max_retry:
if sha1_by_slice_list is not None:
data_ret = self._upload_slice_data(bucket, cos_path, file_content, session, offset,
sha1_by_slice_list[-1]["datasha"])
else:
data_ret = self._upload_slice_data(bucket, cos_path, file_content, session, offset,
None)
if data_ret[u'code'] == 0:
if u'access_url' in data_ret[u'data']:
return data_ret
else:
break
else:
retry_count += 1
return data_ret

if retry_count == max_retry:
return data_ret
else:
offset += slice_size
slice_idx += 1

if sha1_by_slice_list is not None:
data_ret = self._upload_slice_finish(request, session, file_size, sha1_by_slice_list[-1]["datasha"])
else:
data_ret = self._upload_slice_finish(request, session, file_size, None)
logger.info('upload file concurrently')
from threadpool import SimpleThreadPool
pool = SimpleThreadPool(request._max_con)

slice_idx = 0
with open(local_path, 'rb') as local_file:

while offset < file_size:
file_content = local_file.read(slice_size)

pool.add_task(self._upload_slice_data, request, file_content, session, offset)

offset += slice_size
slice_idx += 1

pool.wait_completion()
result = pool.get_result()
if not result['success_all']:
return {u'code': 1, u'message': str(result)}

data_ret = self._upload_slice_finish(request, session, file_size)
return data_ret

def _upload_slice_finish(self, request, session, filesize, sha1):
def _upload_slice_finish(self, request, session, filesize):
auth = cos_auth.Auth(self._cred)
bucket = request.get_bucket_name()
cos_path = request.get_cos_path()
Expand All @@ -403,13 +422,13 @@ def _upload_slice_finish(self, request, session, filesize, sha1):
http_body['op'] = "upload_slice_finish"
http_body['session'] = session
http_body['filesize'] = str(filesize)
if sha1 is not None:
http_body['sha'] = sha1
if request.sha1_list is not None:
http_body['sha'] = request.sha1_list[-1]["datasha"]
timeout = self._config.get_timeout()

return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)

def _upload_slice_control(self, request, sha1_by_slice):
def _upload_slice_control(self, request):
"""串行分片第一步, 上传控制分片
:param request:
Expand All @@ -433,8 +452,8 @@ def _upload_slice_control(self, request, sha1_by_slice):
http_body = dict()
http_body['op'] = 'upload_slice_init'
if request.enable_sha1:
http_body['sha'] = sha1_by_slice[-1]["datasha"]
http_body['uploadparts'] = json.dumps(sha1_by_slice)
http_body['sha'] = request.sha1_list[-1]["datasha"]
http_body['uploadparts'] = json.dumps(request.sha1_list)
http_body['filesize'] = str(file_size)
http_body['slice_size'] = str(slice_size)
http_body['biz_attr'] = biz_atrr
Expand All @@ -443,16 +462,17 @@ def _upload_slice_control(self, request, sha1_by_slice):
timeout = self._config.get_timeout()
return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)

def _upload_slice_data(self, bucket, cos_path, file_content, session, offset, sha1):
def _upload_slice_data(self, request, file_content, session, offset, retry=3):
"""串行分片第二步, 上传数据分片
:param bucket:
:param cos_path:
:param request:
:param file_content:
:param session:
:param offset:
:return:
"""
bucket = request.get_bucket_name()
cos_path = request.get_cos_path()
auth = cos_auth.Auth(self._cred)
expired = int(time.time()) + self._expired_period
sign = auth.sign_more(bucket, cos_path, expired)
Expand All @@ -466,11 +486,42 @@ def _upload_slice_data(self, bucket, cos_path, file_content, session, offset, sh
http_body['filecontent'] = file_content
http_body['session'] = session
http_body['offset'] = str(offset)
if sha1 is not None:
http_body['sha'] = sha1
if request.sha1_content is not None:
http_body['sha'] = request.sha1_content

timeout = self._config.get_timeout()
return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)

for _ in range(retry):
ret = self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
if ret['code'] == 0:
return ret
else:
return ret

def __download_url(self, uri, filename):
session = self._http_session

ret = session.get(uri, stream=True)

with open(filename, 'wb') as f:
for chunk in ret.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
ret.close()

def download_file(self, request):
assert isinstance(request, DownloadFileRequest)

auth = cos_auth.Auth(self._cred)
sign = auth.sign_download(request.get_bucket_name(), request.get_cos_path(), self._config.get_sign_expired())
url = self.build_download_url(request.get_bucket_name(), request.get_cos_path(), sign)
logger.info("Uri is %s" % url)
try:
self.__download_url(url, request._local_filename)
return {u'code': 0, u'message': "download successfully"}
except Exception as e:
return {u'code': 1, u'message': "download failed, exception: " + str(e)}


class FolderOp(BaseOp):
Expand Down
Loading

0 comments on commit d125f5c

Please sign in to comment.