Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File parsing and validation #70

Merged
merged 24 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c66000e
Added update_submission function and associated pytest
jcadam14 Jan 19, 2024
4480894
Pass file through validator
guffee23 Jan 23, 2024
f702751
Merge branch '5_file_parse_and_val_v2' into 5_21_update_submission_pa…
guffee23 Jan 23, 2024
fc42dd7
Update table to correct state as validation progresses
guffee23 Jan 30, 2024
ab41fdd
Black formatting
guffee23 Jan 30, 2024
5ce69bc
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Feb 5, 2024
8fd168a
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Feb 7, 2024
b9fd240
Split validation and state table update into own function
guffee23 Feb 16, 2024
02a1039
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Feb 16, 2024
82a9d00
Set to new session instance for separate transactions
guffee23 Feb 16, 2024
a681c68
Added upload file api test
guffee23 Feb 20, 2024
5e52cae
Renamed test function
guffee23 Feb 20, 2024
14503b5
WIP - validate and update tests
guffee23 Feb 22, 2024
942d062
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Feb 22, 2024
853ba29
Finished valdiation api tests
guffee23 Feb 26, 2024
d1585eb
Added validation version to in progress table write
guffee23 Feb 28, 2024
73e6ca8
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Mar 1, 2024
db84ade
Moved tests under TestSubmissionProcessor class
guffee23 Mar 4, 2024
2988429
Fixed mock in test submission repo
guffee23 Mar 4, 2024
fbd43c5
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Mar 4, 2024
ca9e3dd
Added unauthed test for file upload
guffee23 Mar 5, 2024
65cb7b0
Removed unnecessary lines
guffee23 Mar 6, 2024
0cc3563
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Mar 6, 2024
40dc399
Merge branch 'main' into 5_file_parse_and_val_v2
guffee23 Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Any, List, TypeVar
from entities.engine import get_session
from entities.engine.engine import SessionLocal

from regtech_api_commons.models import AuthenticatedUser

Expand Down Expand Up @@ -94,7 +94,7 @@ async def add_submission(session: AsyncSession, submission: SubmissionDTO) -> Su


async def update_submission(submission: SubmissionDAO, incoming_session: AsyncSession = None) -> SubmissionDAO:
session = incoming_session if incoming_session else await anext(get_session())
session = incoming_session if incoming_session else SessionLocal()
async with session.begin():
try:
new_sub = await session.merge(submission)
Expand Down
2 changes: 1 addition & 1 deletion src/routers/filing.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def upload_file(
):
content = await file.read()
await submission_processor.upload_to_storage(lei, submission_id, content, file.filename.split(".")[-1])
background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content)
background_tasks.add_task(submission_processor.validate_submission, lei, submission_id, content, background_tasks)
Copy link
Collaborator

@lchen-2101 lchen-2101 Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's go with getting submission updated before the background_task kicks in; we don't need 2 nested bg tasks. so have the submission dao updated here to say validation in progress; then just 1 bg task that does the validation and update the submission dao when validation completes.
when we do the first submission update with the validation in progress, pass in the session already attached to the request. so

repo.update_submission(the_dao, request.state.db_session)

then the bg task's one, call it without passing in a session.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we'll deal with this in the refactor that will happen in #51; just address the submission_id comment.



@router.get("/institutions/{lei}/filings/{period_name}/submissions", response_model=List[SubmissionDTO])
Expand Down
53 changes: 50 additions & 3 deletions src/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from io import BytesIO
from fastapi import BackgroundTasks
from regtech_data_validator.create_schemas import validate_phases
import pandas as pd
import importlib.metadata as imeta
from entities.models import SubmissionDAO, SubmissionState
from entities.repos.submission_repo import update_submission
from http import HTTPStatus
from fastapi import HTTPException
import logging
Expand All @@ -19,6 +26,46 @@ async def upload_to_storage(lei: str, submission_id: str, content: bytes, extens
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file")


async def validate_submission(lei: str, submission_id: str, content: bytes):
# implement validation process here
pass
async def validate_submission(lei: str, submission_id: str, content: bytes, background_tasks: BackgroundTasks):
df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
validator_version = imeta.version("regtech-data-validator")

# Set VALIDATION_IN_PROGRESS
await update_submission(
SubmissionDAO(
submitter=submission_id,
state=SubmissionState.VALIDATION_IN_PROGRESS,
validation_ruleset_version=validator_version,
)
)
background_tasks.add_task(validate_and_update_submission, df, lei, submission_id, validator_version)


async def validate_and_update_submission(df: pd.DataFrame, lei: str, submission_id: str, validator_version: str):
# Validate Phases
result = validate_phases(df, {"lei": lei})

# Update tables with response
if not result[0]:
sub_state = (
SubmissionState.VALIDATION_WITH_ERRORS
if "error" in result[1]["validation_severity"].values
else SubmissionState.VALIDATION_WITH_WARNINGS
)
await update_submission(
SubmissionDAO(
submitter=submission_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment

state=sub_state,
validation_ruleset_version=validator_version,
validation_json=result[1].to_json(),
)
)
else:
await update_submission(
SubmissionDAO(
submitter=submission_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

state=SubmissionState.VALIDATION_SUCCESSFUL,
validation_ruleset_version=validator_version,
validation_json=result[1].to_json(),
)
)
9 changes: 9 additions & 0 deletions tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fastapi import FastAPI
from pytest_mock import MockerFixture
from unittest.mock import Mock
import pandas as pd

from entities.models import (
FilingPeriodDAO,
Expand Down Expand Up @@ -122,3 +123,11 @@ def post_filing_mock(mocker: MockerFixture) -> Mock:
institution_snapshot_id="v1",
)
return mock


@pytest.fixture(scope="session")
def submission_csv(tmpdir_factory) -> str:
df = pd.DataFrame([["0", "1"]], columns=["Submission_Column_1", "Submission_Column_2"])
filename = str(tmpdir_factory.mktemp("data").join("submission.csv"))
df.to_csv(filename)
return filename
20 changes: 19 additions & 1 deletion tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,25 @@ async def test_get_latest_submission(self, mocker: MockerFixture, app_fixture: F
mock.assert_called_with(ANY, "1234567890", "2024")
assert res.status_code == 204

async def test_unauthed_patch_filing(self, mocker: MockerFixture, app_fixture: FastAPI):
def test_authed_upload_file(
self, mocker: MockerFixture, app_fixture: FastAPI, authed_user_mock: Mock, submission_csv: str
):
mock_upload = mocker.patch("services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("services.submission_processor.validate_submission")
mock_validate_submission.return_value = None
files = {"file": ("submission.csv", open(submission_csv, "rb"))}
client = TestClient(app_fixture)
res = client.post("/v1/filing/123456790/submissions/1", files=files)
assert res.status_code == 202

def test_unauthed_upload_file(self, mocker: MockerFixture, app_fixture: FastAPI, submission_csv: str):
files = {"file": ("submission.csv", open(submission_csv, "rb"))}
client = TestClient(app_fixture)
res = client.post("/v1/filing/123456790/submissions/1", files=files)
assert res.status_code == 403

async def test_unauthed_patch_filing(self, app_fixture: FastAPI):
client = TestClient(app_fixture)

res = client.patch(
Expand Down
4 changes: 1 addition & 3 deletions tests/entities/repos/test_submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
from regtech_api_commons.models import AuthenticatedUser
from pytest_mock import MockerFixture

from entities.engine import engine as entities_engine


class TestSubmissionRepo:
@pytest.fixture(scope="function", autouse=True)
async def setup(
self, transaction_session: AsyncSession, mocker: MockerFixture, session_generator: async_scoped_session
):
mocker.patch.object(entities_engine, "SessionLocal", return_value=session_generator)
mocker.patch.object(repo, "SessionLocal", return_value=session_generator)

filing_task_1 = FilingTaskDAO(name="Task-1", task_order=1)
filing_task_2 = FilingTaskDAO(name="Task-2", task_order=2)
Expand Down
109 changes: 66 additions & 43 deletions tests/services/test_submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pandas as pd
from services import submission_processor
from fastapi import HTTPException
import pytest
from unittest.mock import Mock, ANY
Expand All @@ -6,46 +8,67 @@
from services.submission_processor import upload_to_storage


@pytest.fixture
def mock_fs(mocker: MockerFixture) -> Mock:
fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem")
return fs_mock_patch.return_value


@pytest.fixture
def mock_fs_func(mocker: MockerFixture, mock_fs: Mock) -> Mock:
fs_func_mock = mocker.patch("services.submission_processor.filesystem")
fs_func_mock.return_value = mock_fs
return fs_func_mock


async def test_upload(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content local")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content local")


async def test_upload_s3_no_mkdir(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
default_fs_proto = settings.upload_fs_protocol
settings.upload_fs_protocol = FsProtocol.S3
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content s3")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_not_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content s3")
settings.upload_fs_protocol = default_fs_proto


async def test_upload_failure(mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
log_mock = mocker.patch("services.submission_processor.log")
mock_fs.mkdirs.side_effect = IOError("test")
with pytest.raises(Exception) as e:
await upload_to_storage("test", "test", b"test content")
log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True)
assert isinstance(e.value, HTTPException)
class TestSubmissionProcessor:
@pytest.fixture
def mock_fs(self, mocker: MockerFixture) -> Mock:
fs_mock_patch = mocker.patch("services.submission_processor.AbstractFileSystem")
return fs_mock_patch.return_value

@pytest.fixture
def mock_fs_func(self, mocker: MockerFixture, mock_fs: Mock) -> Mock:
fs_func_mock = mocker.patch("services.submission_processor.filesystem")
fs_func_mock.return_value = mock_fs
return fs_func_mock

async def test_upload(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content local")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content local")

async def test_upload_s3_no_mkdir(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
default_fs_proto = settings.upload_fs_protocol
settings.upload_fs_protocol = FsProtocol.S3
with mocker.mock_open(mock_fs.open):
await upload_to_storage("test", "test", b"test content s3")
mock_fs_func.assert_called()
mock_fs.mkdirs.assert_not_called()
mock_fs.open.assert_called_with(ANY, "wb")
file_handle = mock_fs.open()
file_handle.write.assert_called_with(b"test content s3")
settings.upload_fs_protocol = default_fs_proto

async def test_upload_failure(self, mocker: MockerFixture, mock_fs_func: Mock, mock_fs: Mock):
log_mock = mocker.patch("services.submission_processor.log")
mock_fs.mkdirs.side_effect = IOError("test")
with pytest.raises(Exception) as e:
await upload_to_storage("test", "test", b"test content")
log_mock.error.assert_called_with("Failed to upload file", ANY, exc_info=True, stack_info=True)
assert isinstance(e.value, HTTPException)

async def test_validate_and_update_successful(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (True, pd.DataFrame(columns=[], index=[]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_SUCCESSFUL"

async def test_validate_and_update_warnings(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (False, pd.DataFrame([["warning"]], columns=["validation_severity"]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_WARNINGS"

async def test_validate_and_update_errors(self, mocker: MockerFixture):
mock_validation = mocker.patch("services.submission_processor.validate_phases")
mock_validation.return_value = (False, pd.DataFrame([["error"]], columns=["validation_severity"]))
mock_update_submission = mocker.patch("services.submission_processor.update_submission")
mock_update_submission.return_value = None
await submission_processor.validate_and_update_submission(pd.DataFrame(), "123456790", "1", "0.1.0")
assert mock_update_submission.mock_calls[0].args[0].state == "VALIDATION_WITH_ERRORS"
Loading