diff --git a/src/mozilla_sec_eia/models/sec10k/entities.py b/src/mozilla_sec_eia/models/sec10k/entities.py index ff523e9..41bd9d7 100644 --- a/src/mozilla_sec_eia/models/sec10k/entities.py +++ b/src/mozilla_sec_eia/models/sec10k/entities.py @@ -63,7 +63,126 @@ class Ex21Layout(pa.DataFrameModel): ) +class Sec10kCoreTable(pa.DataFrameModel): + """Define table structure for core SEC companies table.""" + + sec_company_id: Series[str] = pa.Field( + description="Assigned identifier for the company." + ) + filename: Series[str] = pa.Field(description="Name of extracted filing.") + central_index_key: Series[str] = pa.Field( + description="Identifier of the company in SEC database." + ) + report_date: Series[pa.DateTime] = pa.Field( + description="Report date of the record." + ) + company_name: Series[str] = pa.Field( + description="Cleaned name of the company with legal terms expanded." + ) + utility_id_eia: Series[int] = pa.Field( + description="EIA utility identifier for the company. Matched via record linkage model.", + nullable=True, + ) + street_address: Series[str] = pa.Field( + description="Street address of the company.", nullable=True + ) + street_address_2: Series[str] = pa.Field( + description="Secondary street address of the company.", nullable=True + ) + phone_number: Series[str] = pa.Field( + description="Phone number of company.", nullable=True + ) + city: Series[str] = pa.Field( + description="The city where the company is located.", nullable=True + ) + state: Series[str] = pa.Field( + description="Two letter state code where the company is located.", nullable=True + ) + state_of_incorporation: Series[str] = pa.Field( + description="Two letter state code where the company is located.", nullable=True + ) + zip_code: Series[str] = pa.Field( + description="5 digit zip code where the company is located.", nullable=True + ) + company_name_raw: Series[str] = pa.Field( + description="The raw company name.", nullable=True + ) + date_of_name_change: Series[pa.DateTime] = pa.Field( + description="Date of last name change of the company.", nullable=True + ) + location_of_inc: Series[str] = pa.Field( + description="Cleaned location of incorporation of the company.", nullable=True + ) + company_name_no_legal: Series[str] = pa.Field( + description="Company name with legal terms stripped, e.g. LLC", nullable=True + ) + company_name_mphone: Series[str] = pa.Field( + description="Metaphone of the company name, could used for record linkage." + ) + files_10k: Series[bool] = pa.Field( + description="Indicates whether the company files a 10-K." + ) + + +class Sec10kOutputTable(pa.DataFrameModel): + """Define table structure for output parents and subsidiaries table.""" + + parent_company_cik: Series[str] = pa.Field( + description="CIK of the company's parent company.", nullable=True + ) + own_per: Series[float] = pa.Field( + description="Parent company's ownership percentage of the company.", + nullable=True, + ) + + +class EiaCompanies(pa.DataFrameModel): + """Define table structure for EIA owner and operator companies table.""" + + company_name: Series[str] = pa.Field( + description="Cleaned name of the owner or operator company with legal terms expanded." + ) + street_address: Series[str] = pa.Field( + description="Street address of the company.", nullable=True + ) + street_address_2: Series[str] = pa.Field( + description="Secondary street address of the company.", nullable=True + ) + utility_id_eia: Series[int] = pa.Field( + description="EIA utility identifier for the company.", coerce=True + ) + company_name_raw: Series[str] = pa.Field(description="The raw company name.") + # TODO: What type for type expression? + report_date: Series[pa.DateTime] = pa.Field( + description="Report date of the record." + ) + report_year: Series[int] = pa.Field( + description="Report year of the record.", coerce=True + ) + city: Series[str] = pa.Field( + description="The city where the company is located.", nullable=True + ) + state: Series[str] = pa.Field( + description="Two letter state code where the company is located.", nullable=True + ) + zip_code: Series[str] = pa.Field( + description="5 digit zip code where the company is located.", nullable=True + ) + phone_number: Series[str] = pa.Field( + description="Phone number of company.", nullable=True + ) + company_name_no_legal: Series[str] = pa.Field( + description="Company name with legal terms stripped, e.g. LLC", nullable=True + ) + company_name_mphone: Series[str] = pa.Field( + description="Metaphone of the company name, could used for record linkage." + ) + + ex21_extract_type = pandera_schema_to_dagster_type(Ex21CompanyOwnership) basic_10k_extract_type = pandera_schema_to_dagster_type(Basic10kCompanyInfo) sec10k_extract_metadata_type = pandera_schema_to_dagster_type(Sec10kExtractionMetadata) ex21_layout_type = pandera_schema_to_dagster_type(Ex21Layout) +eia_layout_type = pandera_schema_to_dagster_type(EiaCompanies) +sec10k_output_layout_type = pandera_schema_to_dagster_type(Sec10kOutputTable) +sec10k_core_layout_type = pandera_schema_to_dagster_type(Sec10kCoreTable) diff --git a/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_eia_input.py b/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_eia_input.py index b12ac71..4da37eb 100644 --- a/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_eia_input.py +++ b/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_eia_input.py @@ -10,6 +10,7 @@ flatten_companies_across_time, transform_company_name, ) +from mozilla_sec_eia.models.sec10k.entities import eia_layout_type from mozilla_sec_eia.models.sec_eia_record_linkage.sec_eia_splink_config import STR_COLS EIA_COL_MAP = { @@ -77,6 +78,7 @@ def harvest_eia861_utilities(): @asset( name="core_eia__parents_and_subsidiaries", io_manager_key="pandas_parquet_io_manager", + dagster_type=eia_layout_type, ) # TODO: add Dagster asset inputs for PUDL inputs instead of reading from AWS? def eia_rl_input_table(): diff --git a/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_sec_input.py b/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_sec_input.py index bbdf271..930a29d 100644 --- a/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_sec_input.py +++ b/src/mozilla_sec_eia/models/sec_eia_record_linkage/transform_sec_input.py @@ -16,6 +16,9 @@ flatten_companies_across_time, transform_company_name, ) +from mozilla_sec_eia.models.sec10k.entities import ( + sec10k_output_layout_type, +) from mozilla_sec_eia.models.sec10k.utils.cloud import ( convert_ex21_id_to_filename, ) @@ -359,6 +362,7 @@ def transformed_basic_10k( }, deps=["core_sec_10k__filers"], io_manager_key="pandas_parquet_io_manager", + dagster_type=sec10k_output_layout_type, ) def out_sec_10k__parents_and_subsidiaries( clean_ex21_df: pd.DataFrame, @@ -373,6 +377,7 @@ def out_sec_10k__parents_and_subsidiaries( sec_10k_filers_matched_df = pd.read_parquet( "gs://sec10k-outputs/v2/core_sec_10k__filers.parquet" ) + sec_10k_filers_matched_df = sec_10k_filers_matched_df.drop(columns="record_id") ex21_df_with_cik = match_ex21_subsidiaries_to_filer_company( basic10k_df=sec_10k_filers_matched_df, ex21_df=clean_ex21_df )