Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor doctor_visits: Load source file only once #1978

Open
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

minhkhul
Copy link
Contributor

@minhkhul minhkhul commented Jun 24, 2024

Description

Currently, doctor_visits source files have multiple columns with slightly different names but refer to the same kind of data:

"servicedate": "ServiceDate",
"patCountyFIPS": "PatCountyFIPS",
"patHRRname": "Pat HRR Name",
"patAgeGroup": "PatAgeGroup",
"patHRRid": "Pat HRR ID"

We deal with this by:

  1. Unzip and load source file into dataframe
  2. Rename columns to a standard format
  3. Rewrote the whole source file into its original storage
  4. Reopen the source file and load into another dataframe
  5. Continue processing data.

This PR remove the first round of loading source file and rename columns, replacing it with code that does the same thing but avoid loading/writing the source file twice.

Changelog

Itemize code/test/documentation changes and files added/removed.

  • remove modify_claims_drops related files and tests
  • replace with code that does the same thing (rename columns and check for duplicated rows) in update_sensor.py
  • Adjust unit test files accordingly: adjust test_geo_map, change column names in csv.gz file to be deviant, so the unit test will change them to correct columns.

@minhkhul minhkhul changed the title Refactor doctor_visits for faster runtime Refactor doctor_visits: Load source file only once Jun 24, 2024
@minhkhul minhkhul requested a review from melange396 June 24, 2024 21:16
@aysim319
Copy link
Contributor

aysim319 commented Jul 1, 2024

Description

continuing optimizing for doctors_visit
Main ran: 569 (doctor_visit_EDI_AGG_OUTPATIENT_26062024_1455CDT.log)
doctor_visit_refactor: 94 (doctor_visit_refactored_EDI_AGG_OUTPATIENT_26062024_1455CDT.log)

Changelog

refactored update sensor and moved the csv processing into separate module (process_data)
using datetime date params instead of string in process_date
using dask to read in csv
updated test_update_sensor
added test_process_data
updated config.py
updated run.py to use process_data

Notes

  • also looked into vaex; while it is more suitable as we run on single bare machines, it's less suitable for data wrangling and also was struggling too much to get it working with it (less mature on documentation and resources), open to trying it again if warranted
  • not sure what the memory for the machines as it currently attempts to read the entire in one go which may cause issues if the csv can't fit into memory. (doctor_visit_EDI_AGG_OUTPATIENT_26062024_1455CDT was about 4-5 GB and worked fine even on my local machine). Unsure on how to go about mitigation. Food for thought
  • thoughts on future optimization to improve write_to_cvs, but this seems like a good starting point for now

@aysim319 aysim319 force-pushed the doctor_visits_refactor_for_speed branch 5 times, most recently from 167d75c to 026594b Compare July 8, 2024 22:04
more lint
@aysim319 aysim319 force-pushed the doctor_visits_refactor_for_speed branch from 026594b to bfa853a Compare July 9, 2024 14:37
@aysim319 aysim319 force-pushed the doctor_visits_refactor_for_speed branch from 4fa370a to 1b5e6d3 Compare July 11, 2024 15:19
@aysim319 aysim319 force-pushed the doctor_visits_refactor_for_speed branch from 1b5e6d3 to 9920821 Compare July 11, 2024 15:23
@aysim319
Copy link
Contributor

output of one day of state and cprofile output with validation scripts
profile_results.zip

@aysim319 aysim319 force-pushed the doctor_visits_refactor_for_speed branch from 9b27248 to 7896042 Compare July 12, 2024 16:26
@aysim319 aysim319 linked an issue Jul 30, 2024 that may be closed by this pull request
Copy link
Contributor

@nmdefries nmdefries left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some style comments to get started. I'm making another pass for logic/flow.

val_isnull = df["val"].isnull()
df_val_null = df[val_isnull]
assert len(df_val_null) == 0, "sensor value is nan, check pipeline"
df = df[~val_isnull]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (optional): these filters (here and the two below) aren't actually changing the behavior since if there were any null or too high values, the assertion would cause the program to error out.

assert len(df_val_null) == 0, "sensor value is nan, check pipeline"
df = df[~val_isnull]

se_too_high = df["se"] >= 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please add brief rationale/source for this threshold of 5 in a comment. (The threshold of 90 below is self-evident and doesn't need a comment.)

If the threshold value came from the old code and was not explained, no need to do anything here.

df = df[~sensor_too_high]

if se:
valid_cond = (df["se"] > 0) & (df["val"] > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion: consider consistency with past behavior of this indicator and with behavior of other indicators. I know some of our other indicators reported any odd (negative, etc) values that came in from the source. I'm somewhat wary of just dropping these points.

Second, if the df["val"] > 0 part of this filter is always desired behavior, it seems like it should also be done in the else block below. Right now the if not se case doesn't do any filtering by value.

Third, is val == 0 not valid?

valid_cond = (df["se"] > 0) & (df["val"] > 0)
invalid_df = df[~valid_cond]
if len(invalid_df) > 0:
logger.info("p=0, std_err=0 invalid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please expand/clarify this logging message. Our filter is doing more than removing p == 0 and se == 0 points. This could perhaps include the current geo_id as in above assert messages and/or a count of affected rows.

Comment on lines +97 to +103
# out_name = format_outname(prefix, se, weekday)

# write out results
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if se:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: I think we want

Suggested change
# out_name = format_outname(prefix, se, weekday)
# write out results
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if se:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name
out_name = format_outname(prefix, se, weekday)


out_n = 0
for d in set(output_df["date"]):
filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_level, out_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (optional): for readability, prefer f-strings over % string formatting.

Comment on lines +113 to +115
outfile.write("geo_id,val,se,direction,sample_size\n")

for line in single_date_df.itertuples():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The following chunk needs a rewrite to simplify.

Use the built-in pd.write_csv. itertuples is slow and unnecessary. The checks/conversions we're doing in the itertuples loop either can be done in bulk (even before we split by date, actually) or have already been done (e.g. didn't we already multiply se by 100? Also assertions on values).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Dmitry already commented on this with some example code to use.


# aggregate age groups (so data is unique by service date and FIPS)
df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: recommend moving the duplicate check to before the groupby. If we had

date, geo, age, value
1,us,18-,x
1,us,18+,x
1,us,18-,x
1,us,18+,x
...

The groupby-sum would cause us to double-count the two age groups, since we're only grouping over date and geo. So the duplicate check should come first and check date, geo, and age combos.

# aggregate age groups (so data is unique by service date and FIPS)
df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation"
assert (df[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't we run this check elsewhere?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize Doctor Visit
5 participants