Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ia-tools-pipeline into fix_missing_folder_error
  • Loading branch information
patrick-troy committed Feb 13, 2025
2 parents 946fe48 + 1de5c55 commit 034e1db
Show file tree
Hide file tree
Showing 16 changed files with 982 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ INPUT_LOCATION=Path/to/input_files
WORKSPACE_LOCATION=Path/to/workspace_files
SHARED_LOCATION=Path/to/shared_files
EXTERNAL_DATA_LOCATION=Path/to/external/input_files
ALLOWED_DATASETS=Dataset codes separated by comma (no spaces) e.g. ssda903,cin,annex_a
ALLOWED_DATASETS=Dataset codes separated by comma (no spaces) e.g. ssda903,cin,annex_a,pnw_census
CLEAN_SCHEDULE=Desired schedule to run clean job in cron format e.g. 0 0 * * *
REPORTS_SCHEDULE=Desired schedule to run reports job in cron format e.g. 0 0 * * *
30 changes: 0 additions & 30 deletions liiatools/annex_a_pipeline/stream_filters.py

This file was deleted.

3 changes: 1 addition & 2 deletions liiatools/annex_a_pipeline/stream_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from sfdata_stream_parser.filters import generic

from liiatools.annex_a_pipeline import stream_filters
from liiatools.common import stream_filters as stream_functions
from liiatools.common.data import DataContainer, FileLocator, ProcessResult
from liiatools.common.spec.__data_schema import DataSchema
Expand All @@ -26,7 +25,7 @@ def task_cleanfile(src_file: FileLocator, schema: DataSchema) -> ProcessResult:
# Configure stream
stream = stream_functions.add_table_name(stream, schema=schema)
stream = stream_functions.inherit_property(stream, ["table_name", "table_spec"])
stream = stream_filters.convert_column_header_to_match(stream, schema=schema)
stream = stream_functions.convert_column_header_to_match(stream, schema=schema)
stream = stream_functions.match_config_to_cell(stream, schema=schema)

logger.info("Stream for file %s configured", src_file.name)
Expand Down
12 changes: 11 additions & 1 deletion liiatools/common/reference/_authorities.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ data_codes:
Peterborough: "874"
Southend-on-Sea: "882"
Suffolk: "935"
Thurrock: "883"
Thurrock: "883"
Bolton: "350"
Bury: "351"
Manchester: "352"
Oldham: "353"
Rochdale: "354"
Salford: "355"
Stockport: "356"
Tameside: "357"
Trafford: "358"
Wigan: "359"
23 changes: 23 additions & 0 deletions liiatools/common/stream_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,26 @@ def _create_regex_spec(field: str, file: Path) -> str | None:
regex_spec = p.get("value")

return regex_spec


@streamfilter(check=type_check(events.Cell), fail_function=pass_event)
def convert_column_header_to_match(event, schema: DataSchema):
"""
Converts the column header to the correct column header it was matched with e.g. Age -> Age of Child (Years)
:param event: A filtered list of event objects of type Cell
:param schema: The data schema in a DataSchema class
:return: An updated list of event objects
"""
if hasattr(event, "table_name") and hasattr(event, "header"):
column_config = schema.table.get(event.table_name)
for column in column_config:
if column_config[column].header_regex is not None:
for regex in column_config[column].header_regex:
parse = Column().parse_regex(regex)
if parse.match(event.header) is not None:
return event.from_event(event, header=column)
elif column.lower().strip() == event.header.lower().strip():
return event.from_event(event, header=column)
else:
logger.debug('No match found for header "%s"', event.header)
return event
Empty file.
47 changes: 47 additions & 0 deletions liiatools/pnw_census_pipeline/spec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
from functools import lru_cache
from pathlib import Path

from pydantic_yaml import parse_yaml_file_as
from ruamel.yaml import YAML

yaml = YAML()
yaml.preserve_quotes = True

from liiatools.common.data import PipelineConfig
from liiatools.common.spec.__data_schema import DataSchema

__ALL__ = ["load_schema", "DataSchema", "Category", "Column"]

logger = logging.getLogger(__name__)

SCHEMA_DIR = Path(__file__).parent


@lru_cache
def load_pipeline_config():
"""
Load the pipeline config file
:return: Parsed pipeline config file
"""
with open(SCHEMA_DIR / "pipeline.json", "rt") as f:
return parse_yaml_file_as(PipelineConfig, f)


@lru_cache
def load_schema() -> DataSchema:
"""
Load the data schema file
:return: The data schema in a DataSchema class
"""
schema_path = Path(SCHEMA_DIR, "pnw_census_schema.yml")

# If we have no schema files, raise an error
if not schema_path:
raise ValueError(f"No schema files found")

with open(schema_path, "r", encoding="utf-8") as file:
full_schema = yaml.load(file)

# Now we can parse the full schema into a DataSchema object from the dict
return DataSchema(**full_schema)
170 changes: 170 additions & 0 deletions liiatools/pnw_census_pipeline/spec/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
{
"retention_columns": {
"year_column": "Year",
"la_column": "LA"
},
"retention_period": {
"PAN": 7
},
"la_signed": {
"Bolton": {
"PAN": "No"
},
"Bury": {
"PAN": "No"
},
"Manchester": {
"PAN": "No"
},
"Oldham": {
"PAN": "No"
},
"Rochdale": {
"PAN": "No"
},
"Salford": {
"PAN": "No"
},
"Stockport": {
"PAN": "No"
},
"Tameside": {
"PAN": "No"
},
"Trafford": {
"PAN": "No"
},
"Wigan": {
"PAN": "No"
}
},
"table_list": [
{
"id": "pnw_census",
"retain": [
"PAN"
],
"columns": [
{
"id": "Looked after child?",
"type": "category"
},
{
"id": "Placing Authority",
"type": "category"
},
{
"id": "Identifier",
"type": "alphanumeric",
"unique_key": true
},
{
"id": "Age",
"type": "integer"
},
{
"id": "Gender",
"type": "category"
},
{
"id": "Ethnic Group",
"type": "category"
},
{
"id": "How the placement was sourced",
"type": "category"
},
{
"id": "Placement start date",
"type": "date",
"sort": 0
},
{
"id": "Organisation",
"type": "alphanumeric"
},
{
"id": "Provider ID",
"type": "alphanumeric"
},
{
"id": "Establishment",
"type": "alphanumeric"
},
{
"id": "Registration type",
"type": "category"
},
{
"id": "Establishment registration URN",
"type": "alphanumeric"
},
{
"id": "Host Authority",
"type": "category"
},
{
"id": "Primary Placing at Distance Reason",
"type": "category"
},
{
"id": "Type of provision",
"type": "category"
},
{
"id": "UASC",
"type": "category"
},
{
"id": "Total weekly cost",
"type": "float"
},
{
"id": "Contribution from Social Care",
"type": "float"
},
{
"id": "Contribution from Education",
"type": "float"
},
{
"id": "Contribution from Health",
"type": "float"
},
{
"id": "SEND",
"type": "alphanumeric"
},
{
"id": "Primary SEND category",
"type": "alphanumeric",
"exclude": [
"PAN"
]
},
{
"id": "Primary SEND need",
"type": "alphanumeric",
"exclude": [
"PAN"
]
},
{
"id": "LA",
"type": "string",
"enrich": "la_name"
},
{
"id": "Month",
"type": "integer",
"enrich": "month"
},
{
"id": "Year",
"type": "integer",
"enrich": "year"
}
]
}
]
}
Loading

0 comments on commit 034e1db

Please sign in to comment.