Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

378 update submission processing to use polars data validator #394

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM ghcr.io/cfpb/regtech/sbl/python-alpine:3.12

FROM --platform=amd64 ghcr.io/cfpb/regtech/sbl/python-ubi8:3.12
ENV UVICORN_LOG_LEVEL=info

WORKDIR /usr/app
RUN mkdir upload

RUN pip install poetry

COPY --chown=sbl:sbl poetry.lock pyproject.toml alembic.ini README.md ./
Expand All @@ -19,7 +19,8 @@ RUN chmod -R 447 /usr/app/upload
WORKDIR /usr/app/src

EXPOSE 8888
#RUN groupadd --system sbl && useradd --system --create-home sbl -s /sbin/nologin -g sbl

USER sbl
#USER sbl

CMD uvicorn sbl_filing_api.main:app --host 0.0.0.0 --port 8888 --log-config log-config.yml --log-level $UVICORN_LOG_LEVEL --timeout-keep-alive 65
3,462 changes: 2,402 additions & 1,060 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ asyncpg = "^0.29.0"
regtech-api-commons = {git = "https://github.com/cfpb/regtech-api-commons.git"}
regtech-data-validator = {git = "https://github.com/cfpb/regtech-data-validator.git"}
regtech-regex = {git = "https://github.com/cfpb/regtech-regex.git"}
boto3 = "~1.34.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the downgrade? fsspec related?

Copy link
Contributor Author

@jcadam14 jcadam14 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, with the boto3 and fsspec tie in in the data-validator. But if I remove that to have the download send back a df, this dependency goes away. Though I'd need to rethink reading in the csv. Right now the filing api just sends over a path and the data validator decides what to do based on path type. s3fs needs the 1.34 version of boto3

python-multipart = "^0.0.12"
boto3 = "^1.35.25"
alembic = "^1.13.3"
async-lru = "^2.0.4"
ujson = "^5.10.0"
Expand Down
22 changes: 11 additions & 11 deletions src/log-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@ handlers:
class: logging.StreamHandler
stream: ext://sys.stdout
loggers:
uvicorn:
regtech_data_validator:
level: INFO
handlers:
- default
propagate: no
uvicorn.error:
level: INFO
uvicorn.access:
propagate: false
regtech_api_commons:
level: INFO
handlers:
- access
propagate: no
regtech_data_validator:
- default
propagate: false
sbl_filing_api:
level: INFO
handlers:
- default
propagate: false
regtech_api_commons:
uvicorn:
level: INFO
handlers:
- default
propagate: false
sbl_filing_api:
uvicorn.error:
level: INFO
uvicorn.access:
level: INFO
handlers:
- default
- access
propagate: false
3 changes: 2 additions & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Settings(BaseSettings):
conn: PostgresDsn | None = None

fs_upload_config: FsUploadConfig

server_config: ServerConfig = ServerConfig()

submission_file_type: str = "text/csv"
Expand All @@ -61,7 +62,7 @@ class Settings(BaseSettings):

max_validation_errors: int = 1000000
max_json_records: int = 10000
max_json_group_size: int = 0
max_json_group_size: int = 200

def __init__(self, **data):
super().__init__(**data)
Expand Down
5 changes: 5 additions & 0 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import csv
import io

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, status
Expand Down Expand Up @@ -193,6 +195,9 @@ async def upload_file(request: Request, lei: str, period_code: str, file: Upload
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
with io.BytesIO(content) as byte_stream:
reader = csv.reader(io.TextIOWrapper(byte_stream))
submission.total_records = sum(1 for row in reader) - 1
submission = await repo.update_submission(request.state.db_session, submission)
except Exception as e:
submission.state = SubmissionState.UPLOAD_FAILED
Expand Down
85 changes: 52 additions & 33 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from typing import Generator
import pandas as pd
import polars as pl
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases
from regtech_data_validator.validator import validate_batch_csv
from regtech_data_validator.data_formatters import df_to_dicts, df_to_download
from regtech_data_validator.checks import Severity
from regtech_data_validator.validation_results import ValidationResults, ValidationPhase
from regtech_data_validator.validation_results import ValidationPhase, ValidationResults
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
from sbl_filing_api.config import settings
from sbl_filing_api.config import FsProtocol, settings
from sbl_filing_api.services import file_handler
from regtech_api_commons.api.exceptions import RegTechHttpException

Expand Down Expand Up @@ -59,6 +58,13 @@ def get_from_storage(period_code: str, lei: str, file_identifier: str, extension
) from e


def generate_file_path(period_code: str, lei: str, file_identifier: str, extension: str = "csv"):
file_path = f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}/{file_identifier}.{extension}"
if settings.fs_upload_config.protocol == FsProtocol.S3.value:
file_path = "s3://" + file_path
return file_path


async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
Expand All @@ -69,33 +75,45 @@ async def validate_and_update_submission(
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(session, submission)

df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
submission.total_records = len(df)
file_path = generate_file_path(period_code, lei, submission.id)

final_phase = ValidationPhase.LOGICAL
all_findings = []
final_df = pl.DataFrame()

for validation_results in validate_batch_csv(
file_path,
context={"lei": lei},
batch_size=50000,
batch_count=1,
max_errors=settings.max_validation_errors,
):
final_phase = validation_results.phase
all_findings.append(validation_results)

# Validate Phases
results = validate_phases(df, {"lei": lei}, max_errors=settings.max_validation_errors)
if all_findings:
final_df = pl.concat([v.findings for v in all_findings], how="diagonal")

submission.validation_results = build_validation_results(results)
submission.validation_results = build_validation_results(final_df, all_findings, final_phase)

if results.findings.empty:
if final_df.is_empty():
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
elif (
results.phase == ValidationPhase.SYNTACTICAL
final_phase == ValidationPhase.SYNTACTICAL
or submission.validation_results["logic_errors"]["total_count"] > 0
):
submission.state = SubmissionState.VALIDATION_WITH_ERRORS
else:
submission.state = SubmissionState.VALIDATION_WITH_WARNINGS

submission_report = df_to_download(
results.findings,
warning_count=results.warning_counts.total_count,
error_count=results.error_counts.total_count,
final_df,
warning_count=sum([r.warning_counts.total_count for r in all_findings]),
error_count=sum([r.error_counts.total_count for r in all_findings]),
max_errors=settings.max_validation_errors,
)
upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

upload_to_storage(period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report)

if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
Expand All @@ -114,15 +132,16 @@ async def validate_and_update_submission(
await update_submission(session, submission)


def build_validation_results(results: ValidationResults):
val_json = df_to_dicts(results.findings, settings.max_json_records, settings.max_json_group_size)
if results.phase == ValidationPhase.SYNTACTICAL:
def build_validation_results(final_df: pl.DataFrame, results: list[ValidationResults], final_phase: ValidationPhase):
val_json = df_to_dicts(final_df, settings.max_json_records, settings.max_json_group_size)
if final_phase == ValidationPhase.SYNTACTICAL:
syntax_error_counts = sum([r.error_counts.single_field_count for r in results])
val_res = {
"syntax_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count, # this will always be zero for syntax errors
"register_count": results.error_counts.register_count, # this will always be zero for syntax errors
"total_count": results.error_counts.total_count,
"single_field_count": syntax_error_counts,
"multi_field_count": 0, # this will always be zero for syntax errors
"register_count": 0, # this will always be zero for syntax errors
"total_count": syntax_error_counts,
"details": val_json,
}
}
Expand All @@ -138,17 +157,17 @@ def build_validation_results(results: ValidationResults):
"details": [],
},
"logic_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count,
"register_count": results.error_counts.register_count,
"total_count": results.error_counts.total_count,
"single_field_count": sum([r.error_counts.single_field_count for r in results]),
"multi_field_count": sum([r.error_counts.multi_field_count for r in results]),
"register_count": sum([r.error_counts.register_count for r in results]),
"total_count": sum([r.error_counts.total_count for r in results]),
"details": errors_list,
},
"logic_warnings": {
"single_field_count": results.warning_counts.single_field_count,
"multi_field_count": results.warning_counts.multi_field_count,
"register_count": results.warning_counts.register_count,
"total_count": results.warning_counts.total_count,
"single_field_count": sum([r.warning_counts.single_field_count for r in results]),
"multi_field_count": sum([r.warning_counts.multi_field_count for r in results]),
"register_count": sum([r.warning_counts.register_count for r in results]),
"total_count": sum([r.warning_counts.total_count for r in results]),
"details": warnings_list,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/app/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_default_maxes():
settings = Settings()
assert settings.max_validation_errors == 1000000
assert settings.max_json_records == 10000
assert settings.max_json_group_size == 0
assert settings.max_json_group_size == 200


def test_default_server_configs():
Expand Down
Loading
Loading