Skip to content

Commit

Permalink
US_BLS_Jolts_data_refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
swethammkumari committed Oct 14, 2024
1 parent 4e7b584 commit c7532ae
Show file tree
Hide file tree
Showing 3 changed files with 1,396 additions and 703 deletions.
80 changes: 50 additions & 30 deletions scripts/us_bls/jolts/bls_jolts.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,51 +83,64 @@ def generate_cleaned_dataframe():
"""
# Series descriptions are used for adjustment status and industry code.
exp_series_columns = [
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code', 'sizeclass_code',
'dataelement_code', 'ratelevel_code', 'footnote_codes', 'begin_year',
'begin_period', 'end_year', 'end_period'
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code',
'sizeclass_code', 'dataelement_code', 'ratelevel_code',
'footnote_codes', 'begin_year', 'begin_period', 'end_year', 'end_period'
]

header = {'User-Agent': '[email protected]'}

series_desc = pd.read_csv("https://download.bls.gov/pub/time.series/jt/jt.series", storage_options=header,converters={'industry_code': str}, sep="\\t" )
series_desc.columns=exp_series_columns
series_desc["series_id"]=series_desc["series_id"].apply(lambda x: x.strip())

series_desc = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.series",
storage_options=header,
converters={'industry_code': str},
sep="\\t")
series_desc.columns = exp_series_columns
series_desc["series_id"] = series_desc["series_id"].apply(
lambda x: x.strip())

series_desc.to_csv("jolts_input_jt_series.csv")
assert len(series_desc.columns) == len(exp_series_columns)
assert (series_desc.columns == exp_series_columns).all()
series_desc = series_desc.set_index("series_id")

# Download various series datapoints
#job_openings = pd.read_csv(

job_openings = pd.read_csv("https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings", storage_options=header,sep="\\s+")

job_openings = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings",
storage_options=header,
sep="\\s+")
job_openings.to_csv("jolts_input_jt_job_openings.csv")

job_hires = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires",
storage_options=header,sep="\\s+")
storage_options=header,
sep="\\s+")
job_hires.to_csv("jolts_input_jt_job_hires.csv")

total_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations", # pylint: disable=line-too-long
storage_options=header,sep="\\s+")
storage_options=header,
sep="\\s+")
total_seps.to_csv("jolts_input_jt_totlal_separations.csv")

total_quits = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits",
storage_options=header,sep="\\s+")
storage_options=header,
sep="\\s+")
total_quits.to_csv("jolts_input_jt_total_quits.csv")

total_layoffs = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges", # pylint: disable=line-too-long
storage_options=header,sep="\\s+")
storage_options=header,
sep="\\s+")
total_layoffs.to_csv("jolts_input_jt_total_layoffs.csv")

total_other_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations", # pylint: disable=line-too-long
storage_options=header,sep="\\s+")
storage_options=header,
sep="\\s+")
total_other_seps.to_csv("jolts_input_jt_total_other_separations.csv")

# Additional information about each dataframe.
Expand All @@ -137,17 +150,17 @@ def generate_cleaned_dataframe():
("Count_JobPosting", "schema:JobPosting", "", job_openings),
("Count_Worker_Hire", "dcs:BLSWorker", "Hire", job_hires),
("Count_Worker_Separation", "dcs:BLSWorker", "Separation", total_seps),
("Count_Worker_VoluntarySeparation", "dcs:BLSWorker", "VoluntarySeparation",
total_quits),
("Count_Worker_InvoluntarySeparation", "dcs:BLSWorker", "InvoluntarySeparation",
total_layoffs),
("Count_Worker_VoluntarySeparation", "dcs:BLSWorker",
"VoluntarySeparation", total_quits),
("Count_Worker_InvoluntarySeparation", "dcs:BLSWorker",
"InvoluntarySeparation", total_layoffs),
("Count_Worker_OtherSeparation", "dcs:BLSWorker", "OtherSeparation",
total_other_seps),
]
# Combine datasets into a single dataframe including origin of data.
jolts_df = pd.DataFrame()
job_columns = ['series_id', 'year', 'period', 'value', 'footnote_codes']

for schema_name, population_type, job_change_event, df in schema_mapping:
# Assert columns are as expected.
assert len(df.columns) == len(job_columns)
Expand All @@ -159,7 +172,7 @@ def generate_cleaned_dataframe():
df.loc[:, 'job_change_event'] = job_change_event
df.loc[:, 'population_type'] = population_type
jolts_df = jolts_df._append(df)

# Drop non-monthly data and throw away slice.
jolts_df = jolts_df.query("period != 'M13'").copy()

Expand All @@ -172,18 +185,21 @@ def period_year_to_iso_8601(row):
jolts_df['Date'] = jolts_df.apply(period_year_to_iso_8601, axis=1)

# Add relevant columns from series information.
series_cols = ['industry_code', 'state_code', 'seasonal', 'ratelevel_code','sizeclass_code']

series_cols = [
'industry_code', 'state_code', 'seasonal', 'ratelevel_code',
'sizeclass_code'
]

jolts_df = jolts_df.merge(series_desc[series_cols],
left_on=["series_id"],
right_index=True)
jolts_df.to_csv("before_query.csv",index=False)
jolts_df.to_csv("before_query.csv", index=False)
# Drop rate data, preliminary data, and non-national data.
jolts_df = jolts_df.query("ratelevel_code == 'L'")
jolts_df = jolts_df.query("footnote_codes != 'P'")
jolts_df = jolts_df.query("state_code == '00'")
jolts_df = jolts_df.query('sizeclass_code == 0')
jolts_df.to_csv("after_query.csv",index=False)
jolts_df.to_csv("after_query.csv", index=False)

# Map industries.
def jolts_code_map(row):
Expand Down Expand Up @@ -217,11 +233,13 @@ def row_to_stat_var(row):
lambda adj: "Adjusted" if adj == "S" else "Unadjusted")
jolts_df['StatisticalVariable'] = jolts_df.apply(row_to_stat_var, axis=1)
for old, new in _dcid_map.items():
jolts_df['StatisticalVariable'] = jolts_df['StatisticalVariable'].str.replace(old, new, regex=False)
jolts_df['StatisticalVariable'] = jolts_df[
'StatisticalVariable'].str.replace(old, new, regex=False)
jolts_df['Value'] = jolts_df['value']

return jolts_df, schema_mapping


def remap_dcid():
file_path = 'BLSJolts_StatisticalVariables.mcf'
for line in fileinput.input(file_path, inplace=True):
Expand All @@ -234,7 +252,8 @@ def remap_dcid():
line = line.replace(f_key, f_value)
print(line)
if line.startswith('naics:'):
print('\n')
print('\n')


def create_statistical_variables(jolts_df, schema_mapping):
"""Creates Statistical Variable nodes.
Expand Down Expand Up @@ -273,7 +292,7 @@ def create_statistical_variables(jolts_df, schema_mapping):
adjusted_schema = "dcs:Adjusted"
else:
adjusted_schema = "dcs:Unadjusted"

# Create new schema object.
stat_var_schema = textwrap.dedent(template_stat_var)

Expand All @@ -293,7 +312,8 @@ def create_statistical_variables(jolts_df, schema_mapping):
"{POPULATION}", pop_type).replace(
"{JOB_CHANGE_EVENT}",
job_change_event))



def main(_):
""" Executes the downloading, preprocessing, and outputting of
required MCF and CSV for JOLTS data.
Expand Down
Loading

0 comments on commit c7532ae

Please sign in to comment.