Skip to content

Commit

Permalink
File parsing and validation (#70)
Browse files Browse the repository at this point in the history
Closes #5 

* Broke out `validate_submission` into separate functions for specific
tasks.
* Changed `update_submission` to utilize new sessions when making db
calls.
* Call validates file passed in and updates the table to the current
step in the process or with completed result.
* Added pytests and fixtures for api and `submission_processor`.

---------

Co-authored-by: Adam <[email protected]>
  • Loading branch information
guffee23 and jcadam14 authored Mar 6, 2024
1 parent 6c04f90 commit e0763f1
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Any, List, TypeVar
from entities.engine import get_session
from entities.engine.engine import SessionLocal

from regtech_api_commons.models import AuthenticatedUser

Expand Down Expand Up @@ -94,7 +94,7 @@ async def add_submission(session: AsyncSession, submission: SubmissionDTO) -> Su


async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSession = None) -> SubmissionDAO:
session = incoming_session if incoming_session else await anext(get_session())
session = incoming_session if incoming_session else SessionLocal()
async with session.begin():
try:
new_sub = await session.merge(submission)
Expand Down
2 changes: 1 addition & 1 deletion src/routers/filing.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def upload_file(
):
content = await file.read()
await submission_processor.upload_to_storage(lei, submission_id, content, file.filename.split(".")[-1])
background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content)
background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content, background_tasks)


@router.get("/institutions/{lei}/filings/{period_name}/submissions", response_model=List[SubmissionDTO])
Expand Down
53 changes: 50 additions & 3 deletions src/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from io import BytesIO
from fastapi import BackgroundTasks
from regtech_data_validator.create_schemas import validate_phases
import pandas as pd
import importlib.metadata as imeta
from entities.models import SubmissionDAO, SubmissionState
from entities.repos.submission_repo import update_submission
from http import HTTPStatus
from fastapi import HTTPException
import logging
Expand All @@ -19,6 +26,46 @@ async def upload_to_storage(lei: str, submission_id: str, content: bytes, extens
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file")


async def validate_submission(lei: str, submission_id: str, content: bytes):
# implement validation process here
pass
async def validate_submission(lei: str, submission_id: str, content: bytes, background_tasks: BackgroundTasks):
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
validator_version = imeta.version("regtech-data-validator")

# Set VALIDATION_IN_PROGRESS
await update_submission(
SubmissionDAO(
submitter=submission_id,
state=SubmissionState.VALIDATION_IN_PROGRESS,
validation_ruleset_version=validator_version,
)
)
background_tasks.add_task(validate_and_update_submission, df, lei, submission_id, validator_version)


async def validate_and_update_submission(df: pd.DataFrame, lei: str, submission_id: str, validator_version: str):
# Validate Phases
result = validate_phases(df, {"lei": lei})

# Update tables with response
if not result[0]:
sub_state = (
SubmissionState.VALIDATION_WITH_ERRORS
if "error" in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
await update_submission(
SubmissionDAO(
submitter=submission_id,
state=sub_state,
validation_ruleset_version=validator_version,
validation_json=result[1].to_json(),
)
)
else:
await update_submission(
SubmissionDAO(
submitter=submission_id,
state=SubmissionState.VALIDATION_SUCCESSFUL,
validation_ruleset_version=validator_version,
validation_json=result[1].to_json(),
)
)
9 changes: 9 additions & 0 deletions tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fastapi import FastAPI
from pytest_mock import MockerFixture
from unittest.mock import Mock
import pandas as pd

from entities.models import (
FilingPeriodDAO,
Expand Down Expand Up @@ -122,3 +123,11 @@ def post_filing_mock(mocker: MockerFixture) -> Mock:
institution_snapshot_id="v1",
)
return mock


@pytest.fixture(scope="session")
def submission_csv(tmpdir_factory) -> str:
df = pd.DataFrame([["0", "1"]], columns=["Submission_Column_1", "Submission_Column_2"])
filename = str(tmpdir_factory.mktemp("data").join("submission.csv"))
df.to_csv(filename)
return filename
20 changes: 19 additions & 1 deletion tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,25 @@ async def test_get_latest_submission(self, mocker: MockerFixture, app_fixture: F
mock.assert_called_with(ANY, "1234567890", "2024")
assert res.status_code == 204

async def test_unauthed_patch_filing(self, mocker: MockerFixture, app_fixture: FastAPI):
def test_authed_upload_file(
self, mocker: MockerFixture, app_fixture: FastAPI, authed_user_mock: Mock, submission_csv: str
):
mock_upload = mocker.patch("services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("services.submission_processor.validate_submission")
mock_validate_submission.return_value = None
files = {"file": ("submission.csv", open(submission_csv, "rb"))}
client = TestClient(app_fixture)
res = client.post("/v1/filing/123456790/submissions/1", files=files)
assert res.status_code == 202

def test_unauthed_upload_file(self, mocker: MockerFixture, app_fixture: FastAPI, submission_csv: str):
files = {"file": ("submission.csv", open(submission_csv, "rb"))}
client = TestClient(app_fixture)
res = client.post("/v1/filing/123456790/submissions/1", files=files)
assert res.status_code == 403

async def test_unauthed_patch_filing(self, app_fixture: FastAPI):
client = TestClient(app_fixture)

res = client.patch(
Expand Down
4 changes: 1 addition & 3 deletions tests/entities/repos/test_submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
from regtech_api_commons.models import AuthenticatedUser
from pytest_mock import MockerFixture

from entities.engine import engine as entities_engine


class TestSubmissionRepo:
@pytest.fixture(scope="function", autouse=True)
async def setup(
self, transaction_session: AsyncSession, mocker: MockerFixture, session_generator: async_scoped_session
):
mocker.patch.object(entities_engine, "SessionLocal", return_value=session_generator)
mocker.patch.object(repo, "SessionLocal", return_value=session_generator)

filing_task_1 = FilingTaskDAO(name="Task-1", task_order=1)
filing_task_2 = FilingTaskDAO(name="Task-2", task_order=2)
Expand Down
109 changes: 66 additions & 43 deletions tests/services/test_submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pandas as pd
from services import submission_processor
from fastapi import HTTPException
import pytest
from unittest.mock import Mock, ANY
Expand All @@ -6,46 +8,67 @@
from services.submission_processor import upload_to_storage


@pytest.fixture
def mock_fs(mocker: MockerFixture) -> Mock:
fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem")
return fs_mock_patch.return_value


@pytest.fixture
def mock_fs_func(mocker: MockerFixture, mock_fs: Mock) -> Mock:
fs_func_mock = mocker.patch("services.submission_processor.filesystem")
fs_func_mock.return_value = mock_fs
return fs_func_mock


async def test_upload(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content local")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content local")


async def test_upload_s3_no_mkdir(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
default_fs_proto = settings.upload_fs_protocol
settings.upload_fs_protocol = FsProtocol.S3
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content s3")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_not_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content s3")
settings.upload_fs_protocol = default_fs_proto


async def test_upload_failure(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
log_mock = mocker.patch("services.submission_processor.log")
mock_fs.mkdirs.side_effect = IOError("test")
with pytest.raises(Exception) as e:
await upload_to_storage("test", "test", b"test content")
log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True)
assert isinstance(e.value, HTTPException)
class TestSubmissionProcessor:
@pytest.fixture
def mock_fs(self, mocker: MockerFixture) -> Mock:
fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem")
return fs_mock_patch.return_value

@pytest.fixture
def mock_fs_func(self, mocker: MockerFixture, mock_fs: Mock) -> Mock:
fs_func_mock = mocker.patch("services.submission_processor.filesystem")
fs_func_mock.return_value = mock_fs
return fs_func_mock

async def test_upload(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content local")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content local")

async def test_upload_s3_no_mkdir(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
default_fs_proto = settings.upload_fs_protocol
settings.upload_fs_protocol = FsProtocol.S3
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content s3")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_not_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content s3")
settings.upload_fs_protocol = default_fs_proto

async def test_upload_failure(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
log_mock = mocker.patch("services.submission_processor.log")
mock_fs.mkdirs.side_effect = IOError("test")
with pytest.raises(Exception) as e:
await upload_to_storage("test", "test", b"test content")
log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True)
assert isinstance(e.value, HTTPException)

async def test_validate_and_update_successful(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (True, pd.DataFrame(columns=[], index=[]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_SUCCESSFUL"

async def test_validate_and_update_warnings(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (False, pd.DataFrame([["warning"]], columns=["validation_severity"]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_WARNINGS"

async def test_validate_and_update_errors(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (False, pd.DataFrame([["error"]], columns=["validation_severity"]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_ERRORS"

0 comments on commit e0763f1

Please sign in to comment.