Skip to content

Commit

Permalink
Fixing formatting errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
albags committed Jul 16, 2024
1 parent e6ca77e commit f8ecba7
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 80 deletions.
117 changes: 79 additions & 38 deletions interactive_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-

"""
This script provides an interactive console application for uploading files to an S3 bucket using presigned URLs.
This script provides an interactive console application for uploading files to an S3 bucket using presigned URLs.
It guides the user through selecting the deployment and data type and then uploads the specified files.
"""

Expand Down Expand Up @@ -31,7 +31,7 @@

def clear_screen():
"""Clear the terminal screen."""
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")


def get_input(prompt):
Expand Down Expand Up @@ -68,11 +68,14 @@ def display_menu():

fullname = get_input("\nYour Full Name")

countries = list(set([d["country"] for d in all_deployments if d["status"] == "active"]))
countries = list(
set([d["country"] for d in all_deployments if d["status"] == "active"])
)
country = get_choice("Countries:", countries)

country_deployments = [
f"{d['location_name']} - {d['camera_id']}" for d in all_deployments
f"{d['location_name']} - {d['camera_id']}"
for d in all_deployments
if d["country"] == country and d["status"] == "active"
]
deployment = get_choice("\nDeployments:", country_deployments)
Expand All @@ -88,7 +91,11 @@ def display_menu():
break
print("Invalid directory. Please try again.")

files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
files = [
os.path.join(directory_path, f)
for f in os.listdir(directory_path)
if os.path.isfile(os.path.join(directory_path, f))
]
directory_path = pathlib.Path(directory_path)
files = list(directory_path.rglob(f"*{extension}"))

Expand All @@ -101,12 +108,25 @@ def display_menu():
print(f"Number of files: {len(files)}")

confirm = get_input("\nUpload files? (yes/no)")
if confirm.lower() == 'yes':
if confirm.lower() == "yes":
# print("\nUploading files...")
s3_bucket_name = [d["country_code"] for d in all_deployments if d["country"] == country and d["status"] == "active"][0].lower()
s3_bucket_name = [
d["country_code"]
for d in all_deployments
if d["country"] == country and d["status"] == "active"
][0].lower()
location_name, camera_id = deployment.split(" - ")
dep_id = [d["deployment_id"] for d in all_deployments if d["country"] == country and d["location_name"] == location_name and d["camera_id"] == camera_id and d["status"] == "active"][0]
asyncio.run(upload_files_in_batches(fullname, s3_bucket_name, dep_id, data_type, files))
dep_id = [
d["deployment_id"]
for d in all_deployments
if d["country"] == country
and d["location_name"] == location_name
and d["camera_id"] == camera_id
and d["status"] == "active"
][0]
asyncio.run(
upload_files_in_batches(fullname, s3_bucket_name, dep_id, data_type, files)
)
# print("Files uploaded successfully!")
prompt_next_action()
else:
Expand All @@ -116,7 +136,9 @@ def display_menu():
def prompt_next_action():
"""Prompt the user for the next action: upload more files or leave."""
while True:
next_action = get_choice("\nWhat do you want to do next?", ["Upload more files", "Leave"])
next_action = get_choice(
"\nWhat do you want to do next?", ["Upload more files", "Leave"]
)
if next_action == "Upload more files":
display_menu()
elif next_action == "Leave":
Expand All @@ -129,16 +151,17 @@ def prompt_next_action():
def get_file_info(file_path):
"""Get file information including name, content, and type."""
filename = os.path.basename(file_path)
file_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
file_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
return filename, file_type


def get_deployments():
"""Fetch deployments from the API with authentication."""
try:
url = "https://connect-apps.ceh.ac.uk/ami-data-upload/get-deployments/"
response = requests.get(url, auth=HTTPBasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD),
timeout=600)
response = requests.get(
url, auth=HTTPBasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD), timeout=600
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
Expand All @@ -151,31 +174,42 @@ def get_deployments():
sys.exit(1)


async def upload_files_in_batches(name, bucket, dep_id, data_type, files, batch_size=100):
async def upload_files_in_batches(
name, bucket, dep_id, data_type, files, batch_size=100
):
"""Upload files in batches."""
async with aiohttp.ClientSession(timeout=ClientTimeout(total=1200)) as session:
while True:
print()
progress_exist = tqdm.asyncio.tqdm(total=len(files), desc='Checking if files already in server')
files_to_upload = await check_files(session, name,
bucket, dep_id, data_type,
files, progress_exist)
progress_exist = tqdm.asyncio.tqdm(
total=len(files), desc="Checking if files already in server"
)
files_to_upload = await check_files(
session, name, bucket, dep_id, data_type, files, progress_exist
)
progress_exist.close()
print(f"{len(files_to_upload)} files missing from the server. The upload will start in a few moments...")
print(
f"{len(files_to_upload)} files missing from the server. The upload will start in a few moments..."
)
print()

if not files_to_upload:
print("All files have been uploaded successfully.")
break

progress_bar = tqdm.asyncio.tqdm(total=len(files_to_upload), desc='Uploading files')
progress_bar = tqdm.asyncio.tqdm(
total=len(files_to_upload), desc="Uploading files"
)

if len(files_to_upload) <= batch_size:
await upload_files(session, name, bucket, dep_id, data_type, files_to_upload)
await upload_files(
session, name, bucket, dep_id, data_type, files_to_upload
)
progress_bar.update(len(files_to_upload))
else:
for i in range(0, len(files_to_upload), batch_size):
batch = files_to_upload[i:i + batch_size]
end = i + batch_size
batch = files_to_upload[i:end]
await upload_files(session, name, bucket, dep_id, data_type, batch)
progress_bar.update(len(batch))

Expand All @@ -190,7 +224,9 @@ async def check_files(session, name, bucket, dep_id, data_type, files, progress_
files_to_upload = []

for file_path in files:
if not await check_file_exist(session, name, bucket, dep_id, data_type, file_path):
if not await check_file_exist(
session, name, bucket, dep_id, data_type, file_path
):
files_to_upload.append(file_path)
progress_exist.update(1)

Expand All @@ -208,8 +244,9 @@ async def check_file_exist(session, name, bucket, dep_id, data_type, file_path):
data.add_field("data_type", data_type)
data.add_field("filename", file_name)

async with session.post(url, auth=BasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD),
data=data) as response:
async with session.post(
url, auth=BasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD), data=data
) as response:
response.raise_for_status()
exist = await response.json()
return exist["exists"]
Expand All @@ -221,11 +258,10 @@ async def upload_files(session, name, bucket, dep_id, data_type, files):
for file_path in files:
file_name, file_type = get_file_info(file_path)
try:
presigned_url = await get_presigned_url(session, name, bucket,
dep_id, data_type,
file_name, file_type)
task = upload_file_to_s3(session, presigned_url,
file_path, file_type)
presigned_url = await get_presigned_url(
session, name, bucket, dep_id, data_type, file_name, file_type
)
task = upload_file_to_s3(session, presigned_url, file_path, file_type)
tasks.append(task)
except Exception as e:
print(f"Error getting presigned URL for {file_name}: {e}")
Expand All @@ -234,7 +270,9 @@ async def upload_files(session, name, bucket, dep_id, data_type, files):


@retry(wait=wait_fixed(2), stop=stop_after_attempt(5))
async def get_presigned_url(session, name, bucket, dep_id, data_type, file_name, file_type):
async def get_presigned_url(
session, name, bucket, dep_id, data_type, file_name, file_type
):
"""Get a presigned URL for file upload."""
url = "https://connect-apps.ceh.ac.uk/ami-data-upload/generate-presigned-url/"

Expand All @@ -246,28 +284,31 @@ async def get_presigned_url(session, name, bucket, dep_id, data_type, file_name,
data.add_field("filename", file_name)
data.add_field("file_type", file_type)

async with session.post(url, auth=BasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD),
data=data) as response:
async with session.post(
url, auth=BasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD), data=data
) as response:
response.raise_for_status()
return await response.json()


@retry(wait=wait_fixed(2), stop=stop_after_attempt(5))
async def upload_file_to_s3(session, presigned_url, file_path, file_type):
"""Upload file content to S3 using the presigned URL."""
headers = {'Content-Type': file_type}
headers = {"Content-Type": file_type}
try:
with open(file_path, 'rb') as file:
with open(file_path, "rb") as file:
data = file.read()
async with session.put(presigned_url, data=data, headers=headers) as response:
async with session.put(
presigned_url, data=data, headers=headers
) as response:
response.raise_for_status()
await response.text()
except aiohttp.ClientError:
pass
except FileNotFoundError:
print(f'File not found: {file_path}')
print(f"File not found: {file_path}")
except Exception as e:
print(f'An unexpected error occurred: {e}')
print(f"An unexpected error occurred: {e}")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ requests
tenacity
tqdm
yaspin
flake8-black
55 changes: 35 additions & 20 deletions s3_bucket_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@


# Load AWS credentials and S3 bucket name from config file
with open('credentials.json', encoding="utf-8") as config_file:
with open("credentials.json", encoding="utf-8") as config_file:
aws_credentials = json.load(config_file)

# Initialize S3 client
s3_client = boto3.client(
's3',
aws_access_key_id=aws_credentials['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=aws_credentials['AWS_SECRET_ACCESS_KEY'],
endpoint_url=aws_credentials['AWS_URL_ENDPOINT'],
region_name=aws_credentials['AWS_REGION']
"s3",
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
endpoint_url=aws_credentials["AWS_URL_ENDPOINT"],
region_name=aws_credentials["AWS_REGION"],
)


Expand All @@ -40,7 +40,9 @@ def get_deployments():
"""Fetch deployments from the API with authentication."""
try:
url = "https://connect-apps.ceh.ac.uk/ami-data-upload/get-deployments/"
response = requests.get(url, auth=HTTPBasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD), timeout=600)
response = requests.get(
url, auth=HTTPBasicAuth(GLOBAL_USERNAME, GLOBAL_PASSWORD), timeout=600
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
Expand All @@ -55,7 +57,7 @@ def get_deployments():

def clear_screen():
"""Clear the terminal screen."""
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")


def get_input(prompt):
Expand All @@ -82,13 +84,10 @@ def number_of_files(s3_bucket_name, prefix):
Count number of files for country, deployment and data type.
"""
# Create a paginator helper for list_objects_v2
paginator = s3_client.get_paginator('list_objects_v2')
paginator = s3_client.get_paginator("list_objects_v2")

# Define the parameters for the pagination operation
operation_parameters = {
'Bucket': s3_bucket_name,
'Prefix': prefix
}
operation_parameters = {"Bucket": s3_bucket_name, "Prefix": prefix}

# Create an iterator for the paginated response
page_iterator = paginator.paginate(**operation_parameters)
Expand All @@ -99,7 +98,7 @@ def number_of_files(s3_bucket_name, prefix):
# Iterate through each page in the paginated response
for page in page_iterator:
# Add the number of keys in the current page to the total count
count += page['KeyCount']
count += page["KeyCount"]

return count

Expand All @@ -118,21 +117,35 @@ def display_menu():

all_deployments = get_deployments()

countries = list(set([d["country"] for d in all_deployments if d["status"] == "active"]))
countries = list(
set([d["country"] for d in all_deployments if d["status"] == "active"])
)
country = get_choice("Countries:", countries)

country_deployments = [
f"{d['location_name']} - {d['camera_id']}" for d in all_deployments
f"{d['location_name']} - {d['camera_id']}"
for d in all_deployments
if d["country"] == country and d["status"] == "active"
]
deployment = get_choice("\nDeployments:", country_deployments)

data_types = ["snapshot_images", "audible_recordings", "ultrasound_recordings"]
data_type = get_choice("\nData type:", data_types)

s3_bucket_name = [d["country_code"] for d in all_deployments if d["country"] == country and d["status"] == "active"][0].lower()
s3_bucket_name = [
d["country_code"]
for d in all_deployments
if d["country"] == country and d["status"] == "active"
][0].lower()
location_name, camera_id = deployment.split(" - ")
dep_id = [d["deployment_id"] for d in all_deployments if d["country"] == country and d["location_name"] == location_name and d["camera_id"] == camera_id and d["status"] == "active"][0]
dep_id = [
d["deployment_id"]
for d in all_deployments
if d["country"] == country
and d["location_name"] == location_name
and d["camera_id"] == camera_id
and d["status"] == "active"
][0]

prefix = f"{dep_id}/{data_type}"

Expand All @@ -149,7 +162,9 @@ def display_menu():
def prompt_next_action():
"""Prompt the user for the next action: check more deployments or leave."""
while True:
next_action = get_choice("\nWhat do you want to do next?", ["Check another deployment", "Leave"])
next_action = get_choice(
"\nWhat do you want to do next?", ["Check another deployment", "Leave"]
)
if next_action == "Check another deployment":
display_menu()
elif next_action == "Leave":
Expand All @@ -159,5 +174,5 @@ def prompt_next_action():
print("Invalid choice. Please try again.")


if __name__ == '__main__':
if __name__ == "__main__":
display_menu()
Loading

0 comments on commit f8ecba7

Please sign in to comment.