diff --git a/src/entities/repos/submission_repo.py b/src/entities/repos/submission_repo.py index a13ea53..2f74a3b 100644 --- a/src/entities/repos/submission_repo.py +++ b/src/entities/repos/submission_repo.py @@ -3,7 +3,7 @@ from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from typing import Any, List, TypeVar -from entities.engine import get_session +from entities.engine.engine import SessionLocal from regtech_api_commons.models import AuthenticatedUser @@ -94,7 +94,7 @@ async def add_submission(session: AsyncSession, submission: SubmissionDTO) -> Su async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSession = None) -> SubmissionDAO: - session = incoming_session if incoming_session else await anext(get_session()) + session = incoming_session if incoming_session else SessionLocal() async with session.begin(): try: new_sub = await session.merge(submission) diff --git a/src/routers/filing.py b/src/routers/filing.py index 4e87e6c..ddad66f 100644 --- a/src/routers/filing.py +++ b/src/routers/filing.py @@ -49,7 +49,7 @@ async def upload_file( ): content = await file.read() await submission_processor.upload_to_storage(lei, submission_id, content, file.filename.split(".")[-1]) - background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content) + background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content, background_tasks) @router.get("/institutions/{lei}/filings/{period_name}/submissions", response_model=List[SubmissionDTO]) diff --git a/src/services/submission_processor.py b/src/services/submission_processor.py index 42217f3..eb41563 100644 --- a/src/services/submission_processor.py +++ b/src/services/submission_processor.py @@ -1,3 +1,10 @@ +from io import BytesIO +from fastapi import BackgroundTasks +from regtech_data_validator.create_schemas import validate_phases +import pandas as pd +import importlib.metadata as imeta +from entities.models import SubmissionDAO, SubmissionState +from entities.repos.submission_repo import update_submission from http import HTTPStatus from fastapi import HTTPException import logging @@ -19,6 +26,46 @@ async def upload_to_storage(lei: str, submission_id: str, content: bytes, extens raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file") -async def validate_submission(lei: str, submission_id: str, content: bytes): - # implement validation process here - pass +async def validate_submission(lei: str, submission_id: str, content: bytes, background_tasks: BackgroundTasks): + df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False) + validator_version = imeta.version("regtech-data-validator") + + # Set VALIDATION_IN_PROGRESS + await update_submission( + SubmissionDAO( + submitter=submission_id, + state=SubmissionState.VALIDATION_IN_PROGRESS, + validation_ruleset_version=validator_version, + ) + ) + background_tasks.add_task(validate_and_update_submission, df, lei, submission_id, validator_version) + + +async def validate_and_update_submission(df: pd.DataFrame, lei: str, submission_id: str, validator_version: str): + # Validate Phases + result = validate_phases(df, {"lei": lei}) + + # Update tables with response + if not result[0]: + sub_state = ( + SubmissionState.VALIDATION_WITH_ERRORS + if "error" in result[1]["validation_severity"].values + else SubmissionState.VALIDATION_WITH_WARNINGS + ) + await update_submission( + SubmissionDAO( + submitter=submission_id, + state=sub_state, + validation_ruleset_version=validator_version, + validation_json=result[1].to_json(), + ) + ) + else: + await update_submission( + SubmissionDAO( + submitter=submission_id, + state=SubmissionState.VALIDATION_SUCCESSFUL, + validation_ruleset_version=validator_version, + validation_json=result[1].to_json(), + ) + ) diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 03ce0f9..d73d13b 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -4,6 +4,7 @@ from fastapi import FastAPI from pytest_mock import MockerFixture from unittest.mock import Mock +import pandas as pd from entities.models import ( FilingPeriodDAO, @@ -122,3 +123,11 @@ def post_filing_mock(mocker: MockerFixture) -> Mock: institution_snapshot_id="v1", ) return mock + + +@pytest.fixture(scope="session") +def submission_csv(tmpdir_factory) -> str: + df = pd.DataFrame([["0", "1"]], columns=["Submission_Column_1", "Submission_Column_2"]) + filename = str(tmpdir_factory.mktemp("data").join("submission.csv")) + df.to_csv(filename) + return filename diff --git a/tests/api/routers/test_filing_api.py b/tests/api/routers/test_filing_api.py index c569d73..53439f2 100644 --- a/tests/api/routers/test_filing_api.py +++ b/tests/api/routers/test_filing_api.py @@ -122,7 +122,25 @@ async def test_get_latest_submission(self, mocker: MockerFixture, app_fixture: F mock.assert_called_with(ANY, "1234567890", "2024") assert res.status_code == 204 - async def test_unauthed_patch_filing(self, mocker: MockerFixture, app_fixture: FastAPI): + def test_authed_upload_file( + self, mocker: MockerFixture, app_fixture: FastAPI, authed_user_mock: Mock, submission_csv: str + ): + mock_upload = mocker.patch("services.submission_processor.upload_to_storage") + mock_upload.return_value = None + mock_validate_submission = mocker.patch("services.submission_processor.validate_submission") + mock_validate_submission.return_value = None + files = {"file": ("submission.csv", open(submission_csv, "rb"))} + client = TestClient(app_fixture) + res = client.post("/v1/filing/123456790/submissions/1", files=files) + assert res.status_code == 202 + + def test_unauthed_upload_file(self, mocker: MockerFixture, app_fixture: FastAPI, submission_csv: str): + files = {"file": ("submission.csv", open(submission_csv, "rb"))} + client = TestClient(app_fixture) + res = client.post("/v1/filing/123456790/submissions/1", files=files) + assert res.status_code == 403 + + async def test_unauthed_patch_filing(self, app_fixture: FastAPI): client = TestClient(app_fixture) res = client.patch( diff --git a/tests/entities/repos/test_submission_repo.py b/tests/entities/repos/test_submission_repo.py index 463b9ad..8727a69 100644 --- a/tests/entities/repos/test_submission_repo.py +++ b/tests/entities/repos/test_submission_repo.py @@ -22,15 +22,13 @@ from regtech_api_commons.models import AuthenticatedUser from pytest_mock import MockerFixture -from entities.engine import engine as entities_engine - class TestSubmissionRepo: @pytest.fixture(scope="function", autouse=True) async def setup( self, transaction_session: AsyncSession, mocker: MockerFixture, session_generator: async_scoped_session ): - mocker.patch.object(entities_engine, "SessionLocal", return_value=session_generator) + mocker.patch.object(repo, "SessionLocal", return_value=session_generator) filing_task_1 = FilingTaskDAO(name="Task-1", task_order=1) filing_task_2 = FilingTaskDAO(name="Task-2", task_order=2) diff --git a/tests/services/test_submission_processor.py b/tests/services/test_submission_processor.py index bd7c0a1..f620923 100644 --- a/tests/services/test_submission_processor.py +++ b/tests/services/test_submission_processor.py @@ -1,3 +1,5 @@ +import pandas as pd +from services import submission_processor from fastapi import HTTPException import pytest from unittest.mock import Mock, ANY @@ -6,46 +8,67 @@ from services.submission_processor import upload_to_storage -@pytest.fixture -def mock_fs(mocker: MockerFixture) -> Mock: - fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem") - return fs_mock_patch.return_value - - -@pytest.fixture -def mock_fs_func(mocker: MockerFixture, mock_fs: Mock) -> Mock: - fs_func_mock = mocker.patch("services.submission_processor.filesystem") - fs_func_mock.return_value = mock_fs - return fs_func_mock - - -async def test_upload(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): - with mocker.mock_open(mock_fs.open): - await upload_to_storage("test", "test", b"test content local") - mock_fs_func.assert_called() - mock_fs.mkdirs.assert_called() - mock_fs.open.assert_called_with(ANY, "wb") - file_handle = mock_fs.open() - file_handle.write.assert_called_with(b"test content local") - - -async def test_upload_s3_no_mkdir(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): - default_fs_proto = settings.upload_fs_protocol - settings.upload_fs_protocol = FsProtocol.S3 - with mocker.mock_open(mock_fs.open): - await upload_to_storage("test", "test", b"test content s3") - mock_fs_func.assert_called() - mock_fs.mkdirs.assert_not_called() - mock_fs.open.assert_called_with(ANY, "wb") - file_handle = mock_fs.open() - file_handle.write.assert_called_with(b"test content s3") - settings.upload_fs_protocol = default_fs_proto - - -async def test_upload_failure(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): - log_mock = mocker.patch("services.submission_processor.log") - mock_fs.mkdirs.side_effect = IOError("test") - with pytest.raises(Exception) as e: - await upload_to_storage("test", "test", b"test content") - log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True) - assert isinstance(e.value, HTTPException) +class TestSubmissionProcessor: + @pytest.fixture + def mock_fs(self, mocker: MockerFixture) -> Mock: + fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem") + return fs_mock_patch.return_value + + @pytest.fixture + def mock_fs_func(self, mocker: MockerFixture, mock_fs: Mock) -> Mock: + fs_func_mock = mocker.patch("services.submission_processor.filesystem") + fs_func_mock.return_value = mock_fs + return fs_func_mock + + async def test_upload(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): + with mocker.mock_open(mock_fs.open): + await upload_to_storage("test", "test", b"test content local") + mock_fs_func.assert_called() + mock_fs.mkdirs.assert_called() + mock_fs.open.assert_called_with(ANY, "wb") + file_handle = mock_fs.open() + file_handle.write.assert_called_with(b"test content local") + + async def test_upload_s3_no_mkdir(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): + default_fs_proto = settings.upload_fs_protocol + settings.upload_fs_protocol = FsProtocol.S3 + with mocker.mock_open(mock_fs.open): + await upload_to_storage("test", "test", b"test content s3") + mock_fs_func.assert_called() + mock_fs.mkdirs.assert_not_called() + mock_fs.open.assert_called_with(ANY, "wb") + file_handle = mock_fs.open() + file_handle.write.assert_called_with(b"test content s3") + settings.upload_fs_protocol = default_fs_proto + + async def test_upload_failure(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock): + log_mock = mocker.patch("services.submission_processor.log") + mock_fs.mkdirs.side_effect = IOError("test") + with pytest.raises(Exception) as e: + await upload_to_storage("test", "test", b"test content") + log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True) + assert isinstance(e.value, HTTPException) + + async def test_validate_and_update_successful(self, mocker: MockerFixture): + mock_validation = mocker.patch("services.submission_processor.validate_phases") + mock_validation.return_value = (True, pd.DataFrame(columns=[], index=[])) + mock_update_submission = mocker.patch("services.submission_processor.update_submission") + mock_update_submission.return_value = None + await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0") + assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_SUCCESSFUL" + + async def test_validate_and_update_warnings(self, mocker: MockerFixture): + mock_validation = mocker.patch("services.submission_processor.validate_phases") + mock_validation.return_value = (False, pd.DataFrame([["warning"]], columns=["validation_severity"])) + mock_update_submission = mocker.patch("services.submission_processor.update_submission") + mock_update_submission.return_value = None + await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0") + assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_WARNINGS" + + async def test_validate_and_update_errors(self, mocker: MockerFixture): + mock_validation = mocker.patch("services.submission_processor.validate_phases") + mock_validation.return_value = (False, pd.DataFrame([["error"]], columns=["validation_severity"])) + mock_update_submission = mocker.patch("services.submission_processor.update_submission") + mock_update_submission.return_value = None + await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0") + assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_ERRORS"