From 5e2173599999c35704593a7509b06fde38defc6f Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 22 Aug 2023 13:37:32 +0200 Subject: [PATCH] Fix: Mypy error --- src/aleph/web/controllers/storage.py | 65 +++++++++++++++------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index e265889d5..9e4c479cf 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -1,49 +1,36 @@ -import asyncio import base64 import datetime as dt import functools import logging from hashlib import sha256 -from io import StringIO -from typing import Union, Tuple, Dict, Optional +from typing import Union, Tuple import aio_pika from eth_account import Account from eth_account.messages import encode_defunct from aleph.chains.common import get_verification_buffer -from aleph.jobs.process_pending_messages import PendingMessageProcessor from aiohttp import web from aiohttp.web_request import FileField from aleph_message.models import ItemType from multidict import MultiDictProxy - -from aleph.chains.chain_service import ChainService, LOGGER -from aleph.chains.nuls import NulsConnector from aleph.db.accessors.balances import get_total_balance from aleph.db.accessors.files import count_file_pins, get_file -from aleph.db.accessors.messages import get_message_status, message_exists -from aleph.db.connection import make_session_factory from aleph.db.models import PendingMessageDb from aleph.exceptions import AlephStorageException, UnknownHashError -from aleph.services.p2p import init_p2p_client -from aleph.services.storage.engine import StorageEngine -from aleph.services.storage.fileystem_engine import FileSystemStorageEngine -from aleph.storage import StorageService from aleph.toolkit import json -from aleph.toolkit.timestamp import timestamp_to_datetime -from aleph.types.db_session import DbSessionFactory, DbSession +from aleph.types.db_session import DbSession from aleph.utils import run_in_executor, item_type_from_hash from aleph.web.controllers.app_state_getters import ( get_session_factory_from_request, - get_storage_service_from_request, get_mq_channel_from_request, get_config_from_request, get_mq_conn_from_request, + get_storage_service_from_request, + get_config_from_request, ) from aleph.web.controllers.utils import multidict_proxy_to_io from aleph.schemas.pending_messages import BasePendingMessage logger = logging.getLogger(__name__) -from aleph.schemas.pending_messages import parse_message MAX_FILE_SIZE = 100 * 1024 * 1024 @@ -111,34 +98,47 @@ async def verify_signature(message: BasePendingMessage) -> bool: return verified -async def get_message_content(post_data: MultiDictProxy[Union[str, bytes, FileField]]) -> Tuple[dict, int]: +async def get_message_content( + post_data: MultiDictProxy[Union[str, bytes, FileField]] +) -> Tuple[dict, int]: message_bytearray = post_data.get("message", b"") value = post_data.get("size") or 0 - if not message_bytearray: - return {}, int(value) # Empty dictionary if no message content - message_string = message_bytearray.decode("utf-8") - message_dict = json.loads(message_string) - message_dict["time"] = float(message_dict["time"]) + if isinstance(message_bytearray, bytearray): + message_string = message_bytearray.decode("utf-8") + message_dict = json.loads(message_string) + message_dict["time"] = float(message_dict["time"]) + else: + message_dict = {} - return message_dict, int(value) + return message_dict, int(str(value)) async def init_mq_con(config): return await aio_pika.connect_robust( - host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value, - password=config.rabbitmq.password.value + host=config.p2p.mq_host.value, + port=config.rabbitmq.port.value, + login=config.rabbitmq.username.value, + password=config.rabbitmq.password.value, ) -async def verify_and_handle_request(pending_message_db, file_io, message, size): +async def verify_and_handle_request( + pending_message_db, file_io, message, size, session_factory +): content = file_io.read(size) item_content = json.loads(message["item_content"]) actual_item_hash = sha256(content).hexdigest() c_item_hash = item_content["item_hash"] is_signature = await verify_signature(message=pending_message_db) - + with session_factory() as session: + current_balance = get_total_balance( + session=session, address=pending_message_db.sender + ) + if current_balance < len(content): + output = {"status": "Payment Required"} + return web.json_response(output, status=402) if not is_signature: output = {"status": "Forbidden"} return web.json_response(output, status=403) @@ -161,9 +161,12 @@ async def storage_add_file_with_message(request: web.Request): post = await request.post() file_io = multidict_proxy_to_io(post) message, size = await get_message_content(post) - pending_message_db = PendingMessageDb.from_message_dict(message_dict=message, reception_time=dt.datetime.now(), - fetched=True) - is_valid_message = await verify_and_handle_request(pending_message_db, file_io, message, size) + pending_message_db = PendingMessageDb.from_message_dict( + message_dict=message, reception_time=dt.datetime.now(), fetched=True + ) + is_valid_message = await verify_and_handle_request( + pending_message_db, file_io, message, size, session_factory + ) if is_valid_message is not None: return is_valid_message