-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor doctor_visits: Load source file only once #1978
base: main
Are you sure you want to change the base?
Changes from 26 commits
c639049
1dd80be
749ed2d
9d8d521
ca38bb7
17259d0
6d841da
4ec46df
9740899
aacc545
dbde5c7
1394d3d
dfc3be2
e07c697
d1ee4ce
fc2c58d
b52d80a
58b51a6
81381d6
bfa853a
073651f
dd06a91
4ddd5a0
593279b
9920821
cd83691
79c34d3
7896042
e2f7953
a4f67c0
9408c81
b2f8b0e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,181 @@ | ||||||||||||||||||
"""Module providing functions for processing and wrangling data.""" | ||||||||||||||||||
|
||||||||||||||||||
from datetime import datetime | ||||||||||||||||||
from pathlib import Path | ||||||||||||||||||
|
||||||||||||||||||
import dask.dataframe as dd | ||||||||||||||||||
import numpy as np | ||||||||||||||||||
import pandas as pd | ||||||||||||||||||
|
||||||||||||||||||
from .config import Config | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
def format_outname(prefix: str, se: bool, weekday: bool): | ||||||||||||||||||
""" | ||||||||||||||||||
Write out results. | ||||||||||||||||||
|
||||||||||||||||||
Parameters | ||||||||||||||||||
---------- | ||||||||||||||||||
prefix: | ||||||||||||||||||
se: boolean to write out standard errors, if true, use an obfuscated name | ||||||||||||||||||
weekday: boolean for weekday adjustments. | ||||||||||||||||||
signals will be generated with weekday adjustments (True) or without | ||||||||||||||||||
adjustments (False) | ||||||||||||||||||
|
||||||||||||||||||
Returns | ||||||||||||||||||
------- | ||||||||||||||||||
outname str | ||||||||||||||||||
""" | ||||||||||||||||||
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" | ||||||||||||||||||
if se: | ||||||||||||||||||
assert prefix is not None, "template has no obfuscated prefix" | ||||||||||||||||||
out_name = prefix + "_" + out_name | ||||||||||||||||||
return out_name | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger): | ||||||||||||||||||
""" | ||||||||||||||||||
Format dataframe and checks for anomalies to write results. | ||||||||||||||||||
|
||||||||||||||||||
Parameters | ||||||||||||||||||
---------- | ||||||||||||||||||
df: dataframe from output from update_sensor | ||||||||||||||||||
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"] | ||||||||||||||||||
se: boolean to write out standard errors, if true, use an obfuscated name | ||||||||||||||||||
logger | ||||||||||||||||||
|
||||||||||||||||||
Returns | ||||||||||||||||||
------- | ||||||||||||||||||
filtered and formatted dataframe | ||||||||||||||||||
""" | ||||||||||||||||||
# report in percentage | ||||||||||||||||||
df["val"] = df["val"] * 100 | ||||||||||||||||||
df["se"] = df["se"] * 100 | ||||||||||||||||||
|
||||||||||||||||||
val_isnull = df["val"].isnull() | ||||||||||||||||||
df_val_null = df[val_isnull] | ||||||||||||||||||
assert len(df_val_null) == 0, "sensor value is nan, check pipeline" | ||||||||||||||||||
df = df[~val_isnull] | ||||||||||||||||||
|
||||||||||||||||||
se_too_high = df["se"] >= 5 | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: Please add brief rationale/source for this threshold of 5 in a comment. (The threshold of 90 below is self-evident and doesn't need a comment.) If the threshold value came from the old code and was not explained, no need to do anything here. |
||||||||||||||||||
df_se_too_high = df[se_too_high] | ||||||||||||||||||
assert len(df_se_too_high) == 0, f"standard error suspiciously high! investigate {geo_id}" | ||||||||||||||||||
df = df[~se_too_high] | ||||||||||||||||||
|
||||||||||||||||||
sensor_too_high = df["val"] >= 90 | ||||||||||||||||||
df_sensor_too_high = df[sensor_too_high] | ||||||||||||||||||
assert len(df_sensor_too_high) == 0, f"standard error suspiciously high! investigate {geo_id}" | ||||||||||||||||||
df = df[~sensor_too_high] | ||||||||||||||||||
|
||||||||||||||||||
if se: | ||||||||||||||||||
valid_cond = (df["se"] > 0) & (df["val"] > 0) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. discussion: consider consistency with past behavior of this indicator and with behavior of other indicators. I know some of our other indicators reported any odd (negative, etc) values that came in from the source. I'm somewhat wary of just dropping these points. Second, if the Third, is |
||||||||||||||||||
invalid_df = df[~valid_cond] | ||||||||||||||||||
if len(invalid_df) > 0: | ||||||||||||||||||
logger.info("p=0, std_err=0 invalid") | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: Please expand/clarify this logging message. Our filter is doing more than removing |
||||||||||||||||||
df = df[valid_cond] | ||||||||||||||||||
else: | ||||||||||||||||||
df["se"] = np.NAN | ||||||||||||||||||
|
||||||||||||||||||
df["direction"] = np.NAN | ||||||||||||||||||
df["sample_size"] = np.NAN | ||||||||||||||||||
return df | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
def write_to_csv( | ||||||||||||||||||
output_df: pd.DataFrame, prefix: str, geo_level: str, weekday: bool, se: bool, logger, output_path="." | ||||||||||||||||||
): | ||||||||||||||||||
""" | ||||||||||||||||||
Write sensor values to csv. | ||||||||||||||||||
|
||||||||||||||||||
Args: | ||||||||||||||||||
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id | ||||||||||||||||||
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"] | ||||||||||||||||||
se: boolean to write out standard errors, if true, use an obfuscated name | ||||||||||||||||||
out_name: name of the output file | ||||||||||||||||||
output_path: outfile path to write the csv (default is current directory) | ||||||||||||||||||
""" | ||||||||||||||||||
# out_name = format_outname(prefix, se, weekday) | ||||||||||||||||||
|
||||||||||||||||||
# write out results | ||||||||||||||||||
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" | ||||||||||||||||||
if se: | ||||||||||||||||||
assert prefix is not None, "template has no obfuscated prefix" | ||||||||||||||||||
out_name = prefix + "_" + out_name | ||||||||||||||||||
Comment on lines
+97
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: I think we want
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
if se: | ||||||||||||||||||
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") | ||||||||||||||||||
|
||||||||||||||||||
out_n = 0 | ||||||||||||||||||
for d in set(output_df["date"]): | ||||||||||||||||||
filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_level, out_name) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit (optional): for readability, prefer f-strings over |
||||||||||||||||||
single_date_df = output_df[output_df["date"] == d] | ||||||||||||||||||
with open(filename, "w") as outfile: | ||||||||||||||||||
outfile.write("geo_id,val,se,direction,sample_size\n") | ||||||||||||||||||
|
||||||||||||||||||
for line in single_date_df.itertuples(): | ||||||||||||||||||
Comment on lines
+113
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: The following chunk needs a rewrite to simplify. Use the built-in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see Dmitry already commented on this with some example code to use. |
||||||||||||||||||
geo_id = line.geo_id | ||||||||||||||||||
sensor = 100 * line.val # report percentages | ||||||||||||||||||
se_val = 100 * line.se | ||||||||||||||||||
assert not np.isnan(sensor), "sensor value is nan, check pipeline" | ||||||||||||||||||
assert sensor < 90, f"strangely high percentage {geo_level, sensor}" | ||||||||||||||||||
if not np.isnan(se_val): | ||||||||||||||||||
assert se_val < 5, f"standard error suspiciously high! investigate {geo_level}" | ||||||||||||||||||
|
||||||||||||||||||
if se: | ||||||||||||||||||
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" | ||||||||||||||||||
outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) | ||||||||||||||||||
else: | ||||||||||||||||||
# for privacy reasons we will not report the standard error | ||||||||||||||||||
outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) | ||||||||||||||||||
out_n += 1 | ||||||||||||||||||
logger.debug(f"wrote {out_n} rows for {geo_level}") | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame: | ||||||||||||||||||
""" | ||||||||||||||||||
Read csv using Dask, filters unneeded data, then converts back into pandas dataframe. | ||||||||||||||||||
|
||||||||||||||||||
Parameters | ||||||||||||||||||
---------- | ||||||||||||||||||
filepath: path to the aggregated doctor-visits data | ||||||||||||||||||
startdate: first sensor date (YYYY-mm-dd) | ||||||||||||||||||
enddate: last sensor date (YYYY-mm-dd) | ||||||||||||||||||
dropdate: data drop date (YYYY-mm-dd) | ||||||||||||||||||
|
||||||||||||||||||
------- | ||||||||||||||||||
""" | ||||||||||||||||||
filepath = Path(filepath) | ||||||||||||||||||
logger.info(f"Processing {filepath}") | ||||||||||||||||||
|
||||||||||||||||||
ddata = dd.read_csv( | ||||||||||||||||||
filepath, | ||||||||||||||||||
compression="gzip", | ||||||||||||||||||
dtype=Config.DTYPES, | ||||||||||||||||||
blocksize=None, | ||||||||||||||||||
) | ||||||||||||||||||
# rename inconsistent column names to match config column names | ||||||||||||||||||
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP) | ||||||||||||||||||
ddata = ddata[Config.FILT_COLS] | ||||||||||||||||||
|
||||||||||||||||||
ddata = ddata.dropna() | ||||||||||||||||||
|
||||||||||||||||||
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL]) | ||||||||||||||||||
|
||||||||||||||||||
df = ddata.compute() | ||||||||||||||||||
|
||||||||||||||||||
# aggregate age groups (so data is unique by service date and FIPS) | ||||||||||||||||||
df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index() | ||||||||||||||||||
assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation" | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: recommend moving the duplicate check to before the
The |
||||||||||||||||||
assert (df[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative" | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: don't we run this check elsewhere? |
||||||||||||||||||
|
||||||||||||||||||
# restrict to training start and end date | ||||||||||||||||||
startdate = startdate - Config.DAY_SHIFT | ||||||||||||||||||
|
||||||||||||||||||
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" | ||||||||||||||||||
assert startdate < enddate, "Start date >= end date" | ||||||||||||||||||
assert enddate <= dropdate, "End date > drop date" | ||||||||||||||||||
|
||||||||||||||||||
date_filter = (df[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (df[Config.DATE_COL] < dropdate) | ||||||||||||||||||
df = df[date_filter] | ||||||||||||||||||
logger.info(f"Done processing {filepath}") | ||||||||||||||||||
return df |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (optional): these filters (here and the two below) aren't actually changing the behavior since if there were any null or too high values, the assertion would cause the program to error out.