Skip to content

Commit

Permalink
feat: Added integrity check and optimized bucket interactions (#102)
Browse files Browse the repository at this point in the history
* refactor: Refactored bucket services

* feat: Optimized bucket interactions

* refactor: Removed intermediate variables

* docs: Added comment

* style: Fixed lint

* refactor: Renamed route

* feat: Added file deletion to bucket service

* feat: Added data duplication and data integrity check to media upload

* fix: Fixed syntax

* fix: Fixed syntax

* refactor: Refactored import in qarnot

* fix: Fixed typo

* fix: Fixed typo

* fix: Added missing import

* fix: Fixed media integrity check

* test: Removed legacy unittest

* style: Renamed variable

* docs: Specified comments

* test: Fixed unittest for media upload

* style: Fixed lint

* test: Added unittest for bucket service

* test: Fixed unittest patching

* chore: Updated env var in docker compose

* chore: Updated workflows

* test: Extended unittest of bucket resolution

* style: Fixed syntax

* refactor: Removed unused import

* refactor: Split up bucket name and media folder env vars

* test: Reflected changes of env vars

* fix: Fixed docker compose

* chore: Reflected changes of docker compose

* refactor: Renamed bucket method

* feat: Improved file existence check using boto3 core methods

* feat: Added bucket method to generate a file temp public URL

* refactor: Removed content read route and optimizes URL resolution

* refactor: Reflected changes on API

* docs: Updated readme

* refactor: Removed background task

* refactor: Removed unused import
  • Loading branch information
frgfm authored Dec 10, 2020
1 parent d70e271 commit 0edaf2d
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 164 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/client-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ jobs:
needs: pkg-install
steps:
- uses: actions/checkout@v2
- name: Build & run API server
env:
QARNOT_TOKEN: ${{ secrets.QARNOT_TOKEN }}
BUCKET_NAME: ${{ secrets.BUCKET_NAME }}
BUCKET_MEDIA_FOLDER: ${{ secrets.BUCKET_MEDIA_FOLDER }}
run: |
PORT=8002 docker-compose up -d --build
docker ps
- name: Set up Python 3.7
uses: actions/setup-python@v1
with:
Expand All @@ -55,10 +63,6 @@ jobs:
${{ runner.os }}-deps-${{ env.cache-name }}-
${{ runner.os }}-deps-
${{ runner.os }}-
- name: Build & run API server
run: |
PORT=8002 docker-compose up -d --build
docker ps
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Build & run docker
env:
QARNOT_TOKEN: ${{ secrets.QARNOT_TOKEN }}
BUCKET_NAME: ${{ secrets.BUCKET_NAME }}
BUCKET_MEDIA_FOLDER: ${{ secrets.BUCKET_MEDIA_FOLDER }}
run: PORT=8002 docker-compose up -d --build
- name: Install dependencies in docker
run: |
Expand All @@ -78,6 +82,10 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Build & run docker
env:
QARNOT_TOKEN: ${{ secrets.QARNOT_TOKEN }}
BUCKET_NAME: ${{ secrets.BUCKET_NAME }}
BUCKET_MEDIA_FOLDER: ${{ secrets.BUCKET_MEDIA_FOLDER }}
run: |
PORT=8002 docker-compose up -d --build
docker ps
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ pip install -r pyro-api/requirements.txt

## Usage

If you wish to deploy this project on a server hosted remotely, you might want to be using [Docker](https://www.docker.com/) containers. You can perform the same using this command:
If you wish to deploy this project on a server hosted remotely, you might want to be using [Docker](https://www.docker.com/) containers. Beforehand, you will need to set a few environment variables either manually or by writing an `.env` file in the root directory of this project, like in the example below:

```
QARNOT_TOKEN=my_very_secret_token
BUCKET_NAME=my_storage_bucket_name
BUCKET_MEDIA_FOLDER=my/media/subfolder
```

Those values will allow your API server to connect to our cloud service provider [Qarnot Computing](https://qarnot.com/), which is mandatory for your local server to be fully operational.
Then you can run the API containers using this command:

```bash
PORT=8002 docker-compose up -d --build
Expand Down
5 changes: 0 additions & 5 deletions client/pyroclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"acknowledge-alert": "/alerts/{alert_id}/acknowledge",
"get-site-devices": "/installations/site-devices/{site_id}",
"get-media-url": "/media/{media_id}/url",
"get-media-image": "/media/{media_id}/image"
}


Expand Down Expand Up @@ -168,7 +167,3 @@ def get_media_url_and_read(self, media_id: int) -> Response:
""" Get the image as a url and read it"""
image_url = requests.get(self.routes["get-media-url"].format(media_id=media_id), headers=self.headers)
return requests.get(image_url.json()['url'])

def get_media_image(self, media_id: int) -> Response:
""" Get the image as a streaming file"""
return requests.get(self.routes["get-media-image"].format(media_id=media_id), headers=self.headers)
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ services:
- TEST_DATABASE_URL=postgresql://pyro_api_test:pyro_api_test@test_db/pyro_api_dev_test
- SUPERUSER_LOGIN=superuser
- SUPERUSER_PWD=superuser
- QARNOT_TOKEN=${QARNOT_TOKEN}
- BUCKET_NAME=${BUCKET_NAME}
- BUCKET_MEDIA_FOLDER=${BUCKET_MEDIA_FOLDER}
db:
image: postgres:12.1-alpine
volumes:
Expand Down
109 changes: 58 additions & 51 deletions src/app/api/routes/media.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fastapi import APIRouter, Path, Security, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from typing import List
from typing import List, Optional
from datetime import datetime

from app.api import crud
Expand All @@ -14,7 +13,8 @@
router = APIRouter()


async def check_for_media_existence(media_id, device_id=None):
async def check_media_registration(media_id: int, device_id: Optional[int] = None) -> MediaOut:
"""Checks whether the media is registered in the DB"""
filters = {"id": media_id}
if device_id is not None:
filters.update({"device_id": device_id})
Expand Down Expand Up @@ -89,64 +89,71 @@ async def delete_media(media_id: int = Path(..., gt=0), _=Security(get_current_a


@router.post("/{media_id}/upload", response_model=MediaOut, status_code=200)
async def upload_media(media_id: int = Path(..., gt=0),
file: UploadFile = File(...),
current_device: DeviceOut = Security(get_current_device, scopes=["device"])):
async def upload_media(
background_tasks: BackgroundTasks,
media_id: int = Path(..., gt=0),
file: UploadFile = File(...),
current_device: DeviceOut = Security(get_current_device, scopes=["device"])
):
"""
Upload a media (image or video) linked to an existing media object in the DB
"""
entry = await check_for_media_existence(media_id, current_device.id)

# Check in DB
entry = await check_media_registration(media_id, current_device.id)

# Concatenate the first 32 chars (to avoid system interactions issues) of SHA256 hash with file extension
file_name = f"{hash_content_file(file.file.read())[:32]}.{file.filename.rpartition('.')[-1]}"
file_hash = hash_content_file(file.file.read())
file_name = f"{file_hash[:32]}.{file.filename.rpartition('.')[-1]}"
# Reset byte position of the file (cf. https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile)
await file.seek(0)
# If files are in a subfolder of the bucket, prepend the folder path
bucket_key = resolve_bucket_key(file_name)

upload_success = await bucket_service.upload_file(bucket_key=bucket_key,
file_binary=file.file)
if upload_success is False:
raise HTTPException(
status_code=500,
detail="The upload did not succeed"
)
entry = dict(**entry)
entry["bucket_key"] = bucket_key
return await crud.update_entry(media, MediaCreation(**entry), media_id)
# Upload if bucket_key is different (otherwise the content is the exact same)
if isinstance(entry['bucket_key'], str) and entry['bucket_key'] == bucket_key:
return await crud.get_entry(media, media_id)
else:
# Failed upload
if not await bucket_service.upload_file(bucket_key=bucket_key, file_binary=file.file):
raise HTTPException(
status_code=500,
detail="Failed upload"
)
# Data integrity check
uploaded_file = await bucket_service.get_file(bucket_key=bucket_key)
# Failed download
if uploaded_file is None:
raise HTTPException(
status_code=500,
detail="The data integrity check failed (unable to download media form bucket)"
)
# Remove temp local file
background_tasks.add_task(bucket_service.flush_tmp_file, uploaded_file)
# Check the hash
with open(uploaded_file, 'rb') as f:
upload_hash = hash_content_file(f.read())
if upload_hash != file_hash:
# Delete corrupted file
await bucket_service.delete_file(bucket_key)
raise HTTPException(
status_code=500,
detail="Data was corrupted during upload"
)

entry = dict(**entry)
entry["bucket_key"] = bucket_key
return await crud.update_entry(media, MediaCreation(**entry), media_id)


@router.get("/{media_id}/url", response_model=MediaUrl, status_code=200)
async def get_media_url(background_tasks: BackgroundTasks,
media_id: int = Path(..., gt=0),
_=Security(get_current_user, scopes=["admin"])):
"""
Retrieve the media image url
"""
media = await check_for_media_existence(media_id)
retrieved_file = await bucket_service.get_uploaded_file(bucket_key=media["bucket_key"])
if retrieved_file is False:
raise HTTPException(
status_code=500,
detail="The download did not succeed"
)
background_tasks.add_task(bucket_service.flush_after_get_uploaded_file, retrieved_file)
return {"url": retrieved_file}


@router.get("/{media_id}/image", status_code=200)
async def get_media_image(background_tasks: BackgroundTasks,
media_id: int = Path(..., gt=0),
_=Security(get_current_user, scopes=["admin"])):
"""
Retrieve the media image as encoded in bytes
"""
media = await check_for_media_existence(media_id)
retrieved_file = await bucket_service.get_uploaded_file(bucket_key=media["bucket_key"])
if retrieved_file is False:
raise HTTPException(
status_code=500,
detail="The download did not succeed"
)
background_tasks.add_task(bucket_service.flush_after_get_uploaded_file, retrieved_file)
return StreamingResponse(open(retrieved_file, 'rb'), media_type="image/jpeg")
async def get_media_url(
media_id: int = Path(..., gt=0),
_=Security(get_current_user, scopes=["admin"])
):
"""Resolve the temporary media image URL"""
# Check in DB
media = await check_media_registration(media_id)
# Check in bucket
temp_public_url = await bucket_service.get_public_url(media['bucket_key'])
return MediaUrl(url=temp_public_url)
1 change: 1 addition & 0 deletions src/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

QARNOT_TOKEN: str = os.getenv("QARNOT_TOKEN")
BUCKET_NAME: str = os.getenv("BUCKET_NAME")
BUCKET_MEDIA_FOLDER: Optional[str] = os.getenv("BUCKET_MEDIA_FOLDER")
DUMMY_BUCKET_FILE = "https://ec.europa.eu/jrc/sites/jrcsh/files/styles/normal-responsive/" \
+ "public/growing-risk-future-wildfires_adobestock_199370851.jpeg"

Expand Down
1 change: 1 addition & 0 deletions src/app/services/bucket/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .qarnot import *
22 changes: 0 additions & 22 deletions src/app/services/bucket/baseBucketService.py

This file was deleted.

18 changes: 0 additions & 18 deletions src/app/services/bucket/dummyBucketService.py

This file was deleted.

94 changes: 94 additions & 0 deletions src/app/services/bucket/qarnot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
import logging
from typing import List, Optional
from fastapi import HTTPException
from qarnot.connection import Connection
from qarnot.bucket import Bucket

from app import config as cfg


__all__ = ['QarnotBucket']


logger = logging.getLogger("uvicorn.warning")


class QarnotBucket:
"""Storage bucket manipulation object on Qarnot computing"""

_bucket: Optional[Bucket] = None
_media_folder: Optional[str] = None

def __init__(self) -> None:
self._connect_to_bucket()

def _connect_to_bucket(self) -> None:
"""Connect to the CSP bucket"""
self._conn = Connection(client_token=cfg.QARNOT_TOKEN)
self._bucket = Bucket(self._conn, cfg.BUCKET_NAME)
if isinstance(cfg.BUCKET_MEDIA_FOLDER, str):
self._media_folder = cfg.BUCKET_MEDIA_FOLDER

@property
def bucket(self) -> Bucket:
if self._bucket is None:
self._connect_to_bucket()
return self._bucket

async def get_file(self, bucket_key: str) -> Optional[str]:
"""Download a file locally and returns the local temp path"""
try:
return self.bucket.get_file(bucket_key)
except Exception as e:
logger.warning(e)
return None

async def check_file_existence(self, bucket_key: str) -> bool:
"""Check whether a file exists on the bucket"""
try:
# Use boto3 head_object method using the Qarnot private connection attribute
# cf. https://github.com/qarnot/qarnot-sdk-python/blob/master/qarnot/connection.py#L188
head_object = self._conn._s3client.head_object(Bucket=cfg.BUCKET_NAME, Key=bucket_key)
return head_object['ResponseMetadata']['HTTPStatusCode'] == 200
except Exception as e:
logger.warning(e)
return False

async def get_public_url(self, bucket_key: str, url_expiration: int = 3600) -> str:
"""Generate a temporary public URL for a bucket file"""
if not await self.check_file_existence(bucket_key):
raise HTTPException(status_code=404, detail="File cannot be found on the bucket storage")

# Point to the bucket file
file_params = {'Bucket': cfg.BUCKET_NAME, 'Key': bucket_key}
# Generate a public URL for it using boto3 presign URL generation
return self._conn._s3client.generate_presigned_url('get_object', Params=file_params, ExpiresIn=url_expiration)

async def upload_file(self, bucket_key: str, file_binary: bin) -> bool:
"""Upload a file to bucket and return whether the upload succeeded"""
try:
self.bucket.add_file(file_binary, bucket_key)
except Exception as e:
logger.warning(e)
return False
return True

async def fetch_bucket_filenames(self) -> List[str]:
"""List all bucket files"""

if isinstance(self._media_folder, str):
obj_summary = self.bucket.directory(self._media_folder)
else:
obj_summary = self.bucket.list_files()

return [file.key for file in list(obj_summary)]

async def flush_tmp_file(self, filename: str) -> None:
"""Remove temporary file"""
if os.path.exists(filename):
os.remove(filename)

async def delete_file(self, bucket_key: str) -> None:
"""Remove bucket file and return whether the deletion succeeded"""
self.bucket.delete_file(bucket_key)
Loading

0 comments on commit 0edaf2d

Please sign in to comment.