Skip to content

Commit

Permalink
Feature switch (#307)
Browse files Browse the repository at this point in the history
* update

* update

* update

* update

---------

Co-authored-by: unknown <[email protected]>
Co-authored-by: lewzylu(卢众意) <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2024
1 parent af77f48 commit a8b67b1
Show file tree
Hide file tree
Showing 2 changed files with 370 additions and 208 deletions.
124 changes: 59 additions & 65 deletions coscmd/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
class CoscmdConfig(object):

def __init__(self, appid, region, endpoint, bucket, secret_id, secret_key, token=None,
part_size=1, max_thread=5, schema='https', anonymous=False, verify='md5', retry=2, timeout=60, silence=False,
multiupload_threshold=100, multidownload_threshold=100,
part_size=1, max_thread=5, schema='https', anonymous=False, verify='md5', retry=2, timeout=60, silence=False,
multiupload_threshold=100, multidownload_threshold=100, enable_old_domain=True, enable_internal_domain=True, auto_switch_domain=True,
*args, **kwargs):
self._appid = appid
self._region = region
Expand All @@ -63,8 +63,11 @@ def __init__(self, appid, region, endpoint, bucket, secret_id, secret_key, token
self._retry = retry
self._timeout = timeout
self._silence = silence
self._multiupload_threshold=multiupload_threshold
self._multidownload_threshold=multidownload_threshold
self._multiupload_threshold = multiupload_threshold
self._multidownload_threshold = multidownload_threshold
self._enable_old_domain = enable_old_domain
self._enable_internal_domain = enable_internal_domain,
self._auto_switch_domain = auto_switch_domain
self._ua = 'coscmd-v' + Version
logger.debug("config parameter-> appid: {appid}, region: {region}, endpoint: {endpoint}, bucket: {bucket}, part_size: {part_size}, max_thread: {max_thread}".format(
appid=appid,
Expand Down Expand Up @@ -131,6 +134,10 @@ def __init__(self, conf, session=None):
self._inner_threadpool = SimpleThreadPool(1)
self._multiupload_threshold = conf._multiupload_threshold * 1024 * 1024
self._multidownload_threshold = conf._multidownload_threshold * 1024 * 1024
self._enable_old_domain = conf._enable_old_domain,
self._enable_internal_domain = conf._enable_internal_domain,
self._auto_switch_domain = conf._auto_switch_domain

self.consumed_bytes = 0
try:
if conf._endpoint != "":
Expand All @@ -142,7 +149,10 @@ def __init__(self, conf, session=None):
Scheme=conf._schema,
Anonymous=conf._anonymous,
UA=self._ua,
Timeout=self._timeout)
Timeout=self._timeout,
EnableOldDomain=self._enable_old_domain,
EnableInternalDomain=self._enable_internal_domain,
AutoSwitchDomainOnRetry=self._auto_switch_domain)
else:
sdk_config = qcloud_cos.CosConfig(Region=conf._region,
SecretId=conf._secret_id,
Expand All @@ -151,11 +161,14 @@ def __init__(self, conf, session=None):
Scheme=conf._schema,
Anonymous=conf._anonymous,
UA=self._ua,
Timeout=self._timeout)
Timeout=self._timeout,
EnableOldDomain=self._enable_old_domain,
EnableInternalDomain=self._enable_internal_domain,
AutoSwitchDomainOnRetry=self._auto_switch_domain)
self._client = qcloud_cos.CosS3Client(sdk_config, self._retry)
except Exception as e:
logger.warn(to_unicode(e))
raise(e)
raise (e)
if session is None:
self._session = requests.session()
else:
Expand Down Expand Up @@ -279,7 +292,7 @@ def upload_file_list(upload_filelist):
upload_filelist = []
# BFS上传文件夹
try:
while(not q.empty()):
while (not q.empty()):
[local_path, cos_path] = q.get()
local_path = to_unicode(local_path)
cos_path = to_unicode(cos_path)
Expand Down Expand Up @@ -394,35 +407,20 @@ def single_upload(self, local_path, cos_path, _http_headers='{}', **kwargs):
logger.warn(to_unicode(e))
return -1
try:
if len(local_path) == 0:
data = ""
else:
with open(local_path, 'rb') as File:
data = File.read()
http_header = _http_header
http_header['x-cos-meta-md5'] = _md5
http_headers = mapped(http_header)
self._client.put_object_from_local_file(
Bucket=self._conf._bucket,
LocalFilePath=local_path,
Key=cos_path,
EnableMD5=(not kwargs['skipmd5']),
**http_headers
)
except Exception as e:
logger.warn("Upload file failed")
logger.warn(to_unicode(e))
return 0
url = self._conf.uri(path=quote(to_printable_str(cos_path)))
for j in range(self._retry):
try:
if j > 0:
logger.info(u"Retry to upload {local_path} => cos://{bucket}/{cos_path}".format(
bucket=self._conf._bucket,
local_path=local_path,
cos_path=cos_path))
http_header = _http_header
http_header['x-cos-meta-md5'] = _md5
rt = self._session.put(url=url,
auth=CosS3Auth(self._conf), data=data, headers=http_header, timeout=self._timeout)
if rt.status_code == 200:
return 0
else:
raise Exception(response_info(rt))
except Exception as e:
self._session.close()
self._session = requests.session()
logger.warn(to_unicode(e))
time.sleep(2**j)
return -1
return -1

def multipart_upload(self, local_path, cos_path, _http_headers='{}', **kwargs):
Expand Down Expand Up @@ -987,31 +985,27 @@ def delete_file(self, cos_path, **kwargs):
if query_yes_no(u"WARN: you are deleting the file in the '{cos_path}' cos_path, please make sure".format(cos_path=cos_path)) is False:
return -3
_versionId = kwargs["versionId"]
url = self._conf.uri(path="{path}?versionId={versionId}"
.format(path=quote(to_printable_str(cos_path)), versionId=_versionId))
try:
rt = self._session.delete(url=url, auth=CosS3Auth(self._conf))
logger.debug(u"init resp, status code: {code}, headers: {headers}".format(
code=rt.status_code,
headers=rt.headers))
if rt.status_code == 204 or rt.status_code == 200:
if _versionId == "":
logger.info(u"Delete cos://{bucket}/{cos_path}".format(
bucket=self._conf._bucket,
cos_path=cos_path))
else:
logger.info(u"Delete cos://{bucket}/{cos_path}?versionId={versionId}".format(
bucket=self._conf._bucket,
cos_path=cos_path,
versionId=_versionId))
return 0
if _versionId == "":
self._client.delete_object(
Bucket=self._conf._bucket,
Key=cos_path)
logger.info(u"Delete cos://{bucket}/{cos_path}".format(
bucket=self._conf._bucket,
cos_path=cos_path))
else:
logger.warn(response_info(rt))
return -1
self._client.delete_object(
Bucket=self._conf._bucket,
Key=cos_path,
VersionId=_versionId)
logger.info(u"Delete cos://{bucket}/{cos_path}?versionId={versionId}".format(
bucket=self._conf._bucket,
cos_path=cos_path,
versionId=_versionId))
return 0
except Exception as e:
logger.warn(str(e))
return -1
return -1

def list_multipart_uploads(self, cos_path):
logger.debug("getting uploaded parts")
Expand Down Expand Up @@ -1498,14 +1492,14 @@ def download_file(self, cos_path, local_path, _http_headers='{}', **kwargs):
if file_size <= self._multidownload_threshold:
try:
self._client.download_file(
Bucket=self._conf._bucket,
Key=cos_path,
DestFilePath=local_path,
PartSize=self._conf._part_size,
MAXThread=self._conf._max_thread,
EnableCRC=False,
**_http_headers
)
Bucket=self._conf._bucket,
Key=cos_path,
DestFilePath=local_path,
PartSize=self._conf._part_size,
MAXThread=self._conf._max_thread,
EnableCRC=False,
**_http_headers
)
except CosServiceError as e:
logger.info(u"Download cos://{bucket}/{cos_path} => {local_path} failed".format(
bucket=self._conf._bucket,
Expand All @@ -1519,8 +1513,8 @@ def download_file(self, cos_path, local_path, _http_headers='{}', **kwargs):
else:
try:
self._pbar = tqdm(total=file_size, unit='B', ncols=80,
disable=self._silence, unit_divisor=1024, unit_scale=True)
self.consumed_bytes = 0
disable=self._silence, unit_divisor=1024, unit_scale=True)
self.consumed_bytes = 0
self._client.download_file(
Bucket=self._conf._bucket,
Key=cos_path,
Expand Down
Loading

0 comments on commit a8b67b1

Please sign in to comment.