From e4a5c99bf1c526b3c43a69685ef38889d87fe47e Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Thu, 20 Jun 2024 20:04:42 -0700 Subject: [PATCH 1/3] byte support for upload --- python/fedml/api/modules/storage.py | 66 +++++++++++++++++++---------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 33e781be08..736d5509e4 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -30,13 +30,16 @@ def __init__(self, data: dict): class DataType(Enum): FILE = "file" DIRECTORY = "directory" + BYTE = "byte" INVALID = "invalid" + + # Todo (alaydshah): Store service name in metadata # Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later. def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc, - metadata) -> FedMLResponse: + metadata, byte_data_flag=False, byte_data=None) -> FedMLResponse: api_key = authenticate(api_key) user_id, message = _get_user_id_from_api_key(api_key) @@ -44,12 +47,12 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - data_type = _get_data_type(data_path) + data_type = _get_data_type(data_path, byte_data_flag) - if(data_type == DataType.INVALID): + if data_type == DataType.INVALID: return FedMLResponse(code=ResponseCode.FAILURE,message="Invalid data path") - if(data_type == DataType.DIRECTORY): + if data_type == DataType.DIRECTORY: to_upload_path, message = _archive_data(data_path) name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name file_name = name + ".zip" @@ -67,18 +70,24 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre file_name = name - if not to_upload_path: + if not to_upload_path and not byte_data_flag: return FedMLResponse(code=ResponseCode.FAILURE, message=message) #TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. dest_path = os.path.join(user_id, file_name) - file_size = os.path.getsize(to_upload_path) + max_chunk_size = 20 * 1024 * 1024 + + if byte_data_flag: + file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) - file_uploaded_url, message = _upload_multipart(api_key, dest_path, to_upload_path, show_progress, + else: + file_size = os.path.getsize(to_upload_path) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, to_upload_path, show_progress, out_progress_to_err, - progress_desc, metadata) + progress_desc, metadata, byte_data_flag, byte_data) - if(data_type == "dir"): + if data_type == "dir": os.remove(to_upload_path) if not file_uploaded_url: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {to_upload_path}") @@ -262,6 +271,13 @@ def get_chunks(file_path, chunk_size): break yield chunk +def get_chunks_from_byte_data(byte_data, chunk_size): + while True: + chunk = byte_data.read(chunk_size) + if not chunk: + break + yield chunk + def _get_presigned_url(api_key, request_url, file_name, part_number=None): cert_path = MLOpsConfigs.get_cert_path_with_version() @@ -287,7 +303,7 @@ def _upload_part(url,part_data,session): return response -def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None): +def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None, byte_data_flag= False): for retry_attempt in range(max_retries): try: response = _upload_part(presigned_url,chunk,session) @@ -297,11 +313,12 @@ def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session= else: raise requests.exceptions.RequestException - if(pbar is not None): - pbar.update(chunk.__sizeof__()) + if pbar is not None: + pbar.update(len(chunk)) return {'etag': response.headers['ETag'], 'partNumber': part} raise requests.exceptions.RequestException + def _process_post_response(response): if response.status_code != 200: message = (f"Failed to complete multipart upload with status code = {response.status_code}, " @@ -345,14 +362,10 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id): return _process_post_response(complete_multipart_response) -def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_progress_to_err, - progress_desc_text, metadata): +def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive_path, show_progress, out_progress_to_err, + progress_desc_text, metadata, byte_data_flag, byte_data): request_url = ServerConstants.get_presigned_multi_part_url() - file_size = os.path.getsize(archive_path) - - max_chunk_size = 20 * 1024 * 1024 - num_chunks = _get_num_chunks(file_size, max_chunk_size) upload_id = "" @@ -379,8 +392,12 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p upload_id = data['uploadId'] presigned_urls = data['urls'] - parts = [] - chunks = get_chunks(archive_path, max_chunk_size) + if byte_data_flag: + byte_data.seek(0) + chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) + else: + chunks = get_chunks(archive_path, max_chunk_size) + part_info = [] chunk_count = 0 successful_chunks = 0 @@ -396,7 +413,7 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p if show_progress: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar,session=atomic_session, byte_data_flag = byte_data_flag) part_info.append(part_data) successful_chunks += 1 except Exception as e: @@ -474,8 +491,11 @@ def _get_storage_service(service): else: raise NotImplementedError(f"Service {service} not implemented") -def _get_data_type(data_path): - if os.path.isdir(data_path): + +def _get_data_type(data_path, byte_data_flag): + if byte_data_flag: + return DataType.BYTE + elif os.path.isdir(data_path): return DataType.DIRECTORY elif os.path.isfile(data_path): return DataType.FILE From 8661dca89fc783b357787198dada17abbda89259 Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Fri, 21 Jun 2024 17:41:28 -0700 Subject: [PATCH 2/3] simplified way to upload byte data + encrypted_url + new_backend_apis --- python/fedml/api/modules/storage.py | 218 +++++++++++------- python/fedml/api/modules/utils.py | 12 +- .../scheduler/master/server_constants.py | 6 +- .../scheduler_entry/resource_manager.py | 5 +- 4 files changed, 141 insertions(+), 100 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 736d5509e4..657df98e76 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -23,74 +23,40 @@ def __init__(self, data: dict): self.tags = data.get("description", None) self.createdAt = data.get("createTime", None) self.updatedAt = data.get("updateTime", None) - self.size = _get_size(data.get("fileSize",None)) + self.size = _get_size(data.get("fileSize", None)) self.tag_list = data.get("tags", None) self.download_url = data.get("fileUrl", None) + class DataType(Enum): FILE = "file" DIRECTORY = "directory" - BYTE = "byte" INVALID = "invalid" - # Todo (alaydshah): Store service name in metadata # Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later. def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc, - metadata, byte_data_flag=False, byte_data=None) -> FedMLResponse: - api_key = authenticate(api_key) - + metadata, encrypted_api_key_flag=False, byte_data=None) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - data_type = _get_data_type(data_path, byte_data_flag) - - if data_type == DataType.INVALID: - return FedMLResponse(code=ResponseCode.FAILURE,message="Invalid data path") + if byte_data: + file_uploaded_url, message, file_size = _upload_bytes(api_key, user_id, name, show_progress, + out_progress_to_err, progress_desc, metadata, + byte_data, encrypted_api_key_flag=encrypted_api_key_flag) - if data_type == DataType.DIRECTORY: - to_upload_path, message = _archive_data(data_path) - name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name - file_name = name + ".zip" else: - to_upload_path = data_path - base_name = os.path.basename(to_upload_path) - file_extension = os.path.splitext(base_name)[1] - given_extension = None - if name is not None: - given_extension = os.path.splitext(name)[1] - if given_extension is None or given_extension == "": - name = name + file_extension - else: - name = base_name + file_uploaded_url, message, file_size, name = _upload_file(api_key, user_id, name, data_path, show_progress, + out_progress_to_err, progress_desc, metadata, + encrypted_api_key_flag=encrypted_api_key_flag) - file_name = name - - if not to_upload_path and not byte_data_flag: - return FedMLResponse(code=ResponseCode.FAILURE, message=message) - - #TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. - dest_path = os.path.join(user_id, file_name) - max_chunk_size = 20 * 1024 * 1024 - - if byte_data_flag: - file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) - - else: - file_size = os.path.getsize(to_upload_path) - - file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, to_upload_path, show_progress, - out_progress_to_err, - progress_desc, metadata, byte_data_flag, byte_data) - - if data_type == "dir": - os.remove(to_upload_path) if not file_uploaded_url: - return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {to_upload_path}") + return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {data_path}") json_data = { "datasetName": name, @@ -101,7 +67,8 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre } try: - response = _create_dataset(api_key=api_key, json_data=json_data) + response = _create_dataset(api_key=api_key, json_data=json_data, encrypted_api_key_flag=encrypted_api_key_flag) + print("create dataset ", response) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") @@ -112,14 +79,14 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre # Todo(alaydshah): Query service from object metadata -def download(data_name, api_key, service, dest_path, show_progress=True) -> FedMLResponse: - api_key = authenticate(api_key) +def download(data_name, api_key, service, dest_path, show_progress=True, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - metadata_response = get_metadata(data_name, api_key) + metadata_response = get_metadata(data_name, api_key, encrypted_api_key_flag=encrypted_api_key_flag) if metadata_response.code == ResponseCode.SUCCESS: metadata = metadata_response.data if not metadata or not isinstance(metadata, StorageMetadata): @@ -129,7 +96,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM download_url = metadata.download_url given_extension = os.path.splitext(data_name)[1] is_file = True - if(given_extension is None or given_extension ==""): + if (given_extension is None or given_extension == ""): is_file = False if not is_file: @@ -146,7 +113,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM else: if not os.path.exists(dest_path): os.makedirs(dest_path) - shutil.move(path_local,dest_path) + shutil.move(path_local, dest_path) abs_dest_path = os.path.abspath(dest_path) return FedMLResponse(code=ResponseCode.SUCCESS, message=f"Successfully downloaded and unzipped data at " f"{abs_dest_path}", data=abs_dest_path) @@ -185,11 +152,12 @@ def get_user_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=ResponseCode.SUCCESS, message=message, data=data) -def get_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _get_dataset_metadata(api_key=api_key, data_name=data_name) + response = _get_dataset_metadata(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to get metadata of '{data_name}' with " @@ -206,10 +174,10 @@ def get_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=code, message=message) -def list_objects(api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def list_objects(api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _list_dataset(api_key=api_key) + response = _list_dataset(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) except Exception as e: message = f"Failed to list stored objects for account linked with api_key {api_key} with exception {e}" logging.error(message) @@ -258,6 +226,7 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: logging.error(message, data_name, service) return FedMLResponse(code=ResponseCode.FAILURE, message=message, data=False) + def _get_num_chunks(file_size, max_chunk_size): num_chunks = math.ceil(file_size / max_chunk_size) return num_chunks @@ -271,6 +240,7 @@ def get_chunks(file_path, chunk_size): break yield chunk + def get_chunks_from_byte_data(byte_data, chunk_size): while True: chunk = byte_data.read(chunk_size) @@ -279,10 +249,11 @@ def get_chunks_from_byte_data(byte_data, chunk_size): yield chunk -def _get_presigned_url(api_key, request_url, file_name, part_number=None): +def _get_presigned_url(api_key, request_url, file_name, part_number=None, encrypted_api_key_flag=False): cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) params_dict = {'fileKey': file_name} if part_number is not None: params_dict['partNumber'] = part_number @@ -298,15 +269,15 @@ def _get_presigned_url(api_key, request_url, file_name, part_number=None): return response -def _upload_part(url,part_data,session): - response = session.put(url,data=part_data,verify=True) +def _upload_part(url, part_data, session): + response = session.put(url, data=part_data, verify=True) return response -def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None, byte_data_flag= False): +def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20, session=None): for retry_attempt in range(max_retries): try: - response = _upload_part(presigned_url,chunk,session) + response = _upload_part(presigned_url, chunk, session) except requests.exceptions.RequestException as e: if retry_attempt < max_retries: continue @@ -339,13 +310,14 @@ def _process_post_response(response): return data_url, "Successfully uploaded the data! " -def _complete_multipart_upload(api_key, file_key, part_info, upload_id): +def _complete_multipart_upload(api_key, file_key, part_info, upload_id, encrypted_api_key_flag=False): complete_multipart_url = ServerConstants.get_complete_multipart_upload_url() body_dict = {"fileKey": file_key, 'partETags': part_info, 'uploadId': upload_id} cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -362,8 +334,9 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id): return _process_post_response(complete_multipart_response) -def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive_path, show_progress, out_progress_to_err, - progress_desc_text, metadata, byte_data_flag, byte_data): +def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, show_progress, + out_progress_to_err, + progress_desc_text, chunks, encrypted_api_key_flag=False): request_url = ServerConstants.get_presigned_multi_part_url() num_chunks = _get_num_chunks(file_size, max_chunk_size) @@ -371,7 +344,8 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive upload_id = "" presigned_urls = [] - presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks) + presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks, + encrypted_api_key_flag=encrypted_api_key_flag) if presigned_url_response.status_code != 200: message = (f"Failed to get presigned URL with status code = {presigned_url_response.status_code}, " @@ -392,12 +366,6 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive upload_id = data['uploadId'] presigned_urls = data['urls'] - if byte_data_flag: - byte_data.seek(0) - chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) - else: - chunks = get_chunks(archive_path, max_chunk_size) - part_info = [] chunk_count = 0 successful_chunks = 0 @@ -413,7 +381,7 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive if show_progress: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session, byte_data_flag = byte_data_flag) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: @@ -422,18 +390,87 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive else: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: return None, "unsuccessful" if successful_chunks == chunk_count: - return _complete_multipart_upload(api_key, file_key, part_info, upload_id) + return _complete_multipart_upload(api_key, file_key, part_info, upload_id, + encrypted_api_key_flag=encrypted_api_key_flag) else: return None, "Unsuccessful!" +def _upload_bytes(api_key, user_id, file_name, + show_progress, out_progress_to_err, progress_desc, metadata, byte_data, encrypted_api_key_flag=False): + if file_name is None: + return None, "name cannot be None" + + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) + + byte_data.seek(0) + chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + return file_uploaded_url, message, file_size + + +def _upload_file(api_key, user_id, name, data_path, show_progress, out_progress_to_err, progress_desc, + metadata, encrypted_api_key_flag=False): + data_type = _get_data_type(data_path) + message = "" + + if data_type == DataType.INVALID: + return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid data path") + + if data_type == DataType.DIRECTORY: + to_upload_path, message = _archive_data(data_path) + name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name + file_name = name + ".zip" + else: + to_upload_path = data_path + base_name = os.path.basename(to_upload_path) + file_extension = os.path.splitext(base_name)[1] + if name is not None: + given_extension = os.path.splitext(name)[1] + if given_extension is None or given_extension == "": + name = name + file_extension + else: + name = base_name + + file_name = name + + if not to_upload_path: + return FedMLResponse(code=ResponseCode.FAILURE, message=message) + + # TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = os.path.getsize(to_upload_path) + + chunks = get_chunks(to_upload_path, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + if data_type == DataType.DIRECTORY: + os.remove(to_upload_path) + + return file_uploaded_url, message, file_size, name + + def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progress=True): download_response = requests.get(url, verify=True, stream=True) if download_response.status_code == 200: @@ -456,6 +493,7 @@ def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progr return True return False + def _get_user_id_from_api_key(api_key: str) -> (str, str): user_url = ServerConstants.get_user_url() json_data = { @@ -492,10 +530,8 @@ def _get_storage_service(service): raise NotImplementedError(f"Service {service} not implemented") -def _get_data_type(data_path, byte_data_flag): - if byte_data_flag: - return DataType.BYTE - elif os.path.isdir(data_path): +def _get_data_type(data_path): + if os.path.isdir(data_path): return DataType.DIRECTORY elif os.path.isfile(data_path): return DataType.FILE @@ -517,11 +553,12 @@ def _archive_data(data_path: str) -> (str, str): return None, f"Error archiving data: {e}" -def _create_dataset(api_key: str, json_data: dict) -> requests.Response: +def _create_dataset(api_key: str, json_data: dict, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -540,11 +577,12 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response: return response -def _list_dataset(api_key: str) -> requests.Response: +def _list_dataset(api_key: str, encrypted_api_key_flag=False) -> requests.Response: list_dataset_url = ServerConstants.list_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -557,11 +595,12 @@ def _list_dataset(api_key: str) -> requests.Response: return response -def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response: +def _get_dataset_metadata(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_metadata_url = ServerConstants.get_dataset_metadata_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -624,19 +663,20 @@ def _get_data_from_response(message: str, response: requests.Response) -> (Respo return ResponseCode.SUCCESS, "Successfully parsed data from response", data -def _get_size(size_in_bytes:str)->str: + +def _get_size(size_in_bytes: str) -> str: size_str = "" - if(size_in_bytes): + if (size_in_bytes): size = int(size_in_bytes) size_in_gb = size / (1024 * 1024 * 1024) size_in_mb = size / (1024 * 1024) size_in_kb = size / 1024 - if(size_in_gb >= 1): + if (size_in_gb >= 1): size_str = f"{size_in_gb:.2f} GB" - elif(size_in_mb >= 1): + elif (size_in_mb >= 1): size_str = f"{size_in_mb:.2f} MB" - elif(size_in_kb >= 1): + elif (size_in_kb >= 1): size_str = f"{size_in_kb:.2f} KB" else: size_str = f"{size} B" - return size_str \ No newline at end of file + return size_str diff --git a/python/fedml/api/modules/utils.py b/python/fedml/api/modules/utils.py index 76801ffe81..d1d555d0f2 100644 --- a/python/fedml/api/modules/utils.py +++ b/python/fedml/api/modules/utils.py @@ -9,15 +9,15 @@ FEDML_MLOPS_BUILD_PRE_IGNORE_LIST = 'dist-packages,client-package.zip,server-package.zip,__pycache__,*.pyc,*.git' -def fedml_login(api_key): - api_key_is_valid, api_key = _check_api_key(api_key=api_key) +def fedml_login(api_key, encrypted_api_key_flag=False): + api_key_is_valid, api_key = _check_api_key(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) if api_key_is_valid: return 0, api_key return -1, api_key -def _check_api_key(api_key=None): +def _check_api_key(api_key=None, encrypted_api_key_flag=False): if api_key is None or api_key == "": saved_api_key = get_api_key() if saved_api_key is None or saved_api_key == "": @@ -25,7 +25,7 @@ def _check_api_key(api_key=None): else: api_key = saved_api_key - is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key) + is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key, encrypted_api_key_flag) if not is_valid_heartbeat: return False, api_key else: @@ -33,9 +33,9 @@ def _check_api_key(api_key=None): return True, api_key -def authenticate(api_key): +def authenticate(api_key, encrypted_api_key_flag=False): - error_code, api_key = fedml_login(api_key) + error_code, api_key = fedml_login(api_key, encrypted_api_key_flag) # Exit if not able to authenticate successfully if error_code: diff --git a/python/fedml/computing/scheduler/master/server_constants.py b/python/fedml/computing/scheduler/master/server_constants.py index ebd8b2aef6..2c59fbedfd 100644 --- a/python/fedml/computing/scheduler/master/server_constants.py +++ b/python/fedml/computing/scheduler/master/server_constants.py @@ -251,7 +251,7 @@ def get_user_url(): @staticmethod def get_dataset_url(): - create_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset".format( + create_dataset_url = "{}/system/api/v1/cli/storage".format( ServerConstants.get_mlops_url()) return create_dataset_url @@ -271,13 +271,13 @@ def get_complete_multipart_upload_url(): @staticmethod def list_dataset_url(): - list_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset/list".format( + list_dataset_url = "{}/system/api/v1/cli/storage/list".format( ServerConstants.get_mlops_url()) return list_dataset_url @staticmethod def get_dataset_metadata_url(): - get_dataset_metadata_url = "{}/fedmlOpsServer/api/v1/cli/dataset/meta".format( + get_dataset_metadata_url = "{}/system/api/v1/cli/storage/meta".format( ServerConstants.get_mlops_url()) return get_dataset_metadata_url diff --git a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py index 615b77041d..3d910da3aa 100644 --- a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py +++ b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py @@ -15,9 +15,10 @@ def __init__(self): def get_instance(): return FedMLResourceManager() - def check_heartbeat(self, api_key): + def check_heartbeat(self, api_key, encrypted_api_key_flag=False): heartbeat_url = ServerConstants.get_heartbeat_url() - heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close'} + heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close', + 'Encrypted': str(encrypted_api_key_flag)} heartbeat_json = { "apiKey": api_key } From c00ac6a2ecef124bfbfb1967569e1ec3b9984529 Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Fri, 21 Jun 2024 17:49:07 -0700 Subject: [PATCH 3/3] some storage function missed flag - all updated --- python/fedml/api/modules/storage.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 657df98e76..5bf64a441b 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -132,8 +132,8 @@ def download(data_name, api_key, service, dest_path, show_progress=True, encrypt return FedMLResponse(code=ResponseCode.FAILURE, message=error_message) -def get_user_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_user_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -195,8 +195,8 @@ def list_objects(api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: # Todo(alaydshah): Query service from object metadata. Make the transaction atomic or rollback if partially failed -def delete(data_name, service, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def delete(data_name, service, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -209,7 +209,8 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: if result: logging.info(f"Successfully deleted object from storage service.") try: - response = _delete_dataset(api_key=api_key, data_name=data_name) + response = _delete_dataset(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to delete data", response=response) except Exception as e: message = (f"Deleted object from storage service but failed to delete object metadata from Nexus Backend " @@ -622,11 +623,12 @@ def _get_dataset_metadata(api_key: str, data_name: str, encrypted_api_key_flag=F return response -def _delete_dataset(api_key: str, data_name: str) -> requests.Response: +def _delete_dataset(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path