Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awswrangler.s3.read_parquet() chunked=True 1 dataframe per result #2086

Open
matthewyakubiw opened this issue Mar 2, 2023 · 11 comments · Fixed by #2087
Open

awswrangler.s3.read_parquet() chunked=True 1 dataframe per result #2086

matthewyakubiw opened this issue Mar 2, 2023 · 11 comments · Fixed by #2087
Assignees
Labels
question Further information is requested

Comments

@matthewyakubiw
Copy link

Hi there,

I have a question regarding the chunked=true option in awswrangler.s3.read_parquet().
I'm looking to load parquet files from S3 in the most memory efficient way possible. Our data has a differing number of rows per parquet file, but the same number of columns (11). I'd like it so the results from read_parquet() is separated as a pandas DF on a per-parquet file basis. i.e. if based on the filter_query it returns 10 parquet files, I will receive 10 pandas DFs in return. chunked=True works if the number of rows is the same every time, but with our data there will be a different number of rows from time to time, so hard-coding the chunk size isn't feasible.

The documentation says:

If chunked=True, a new DataFrame will be returned for each file in your path/dataset.

However it also seems to be choosing an arbitrary size to chunk in (in my case it's chunks of 65536)
Is there something I'm missing here with regards to this? Thanks very much for your help!

@matthewyakubiw matthewyakubiw added the question Further information is requested label Mar 2, 2023
@LeonLuttenberger
Copy link
Contributor

Hey,

Our documentation appears to be incorrect there. If chunked=True, each file will be in a separate chunk, but the maximum maximum size of that chunk will be 65,536 rows. If a single file exceeds that, the iterator will return more than one chunk for that file.

To give an example, let's say you have two files in your dataset. The first file has 70,000 rows, and the second has 50,000. By passing chunked=True, the iterator will return 3 data frames:

  1. 65,536 rows from the first file
  2. 4,464 rows from the first file
  3. 50,000 rows from the second file

Alternatively, if you explicitly pass a chunk size, rows from different files will be spliced together in order to ensure the data size. Therefore, if you pass chunked=60_000 in the previous example, the iterator will return 2 data frames:

  1. 60,000 rows from the first file
  2. 10,000 rows from the first file and 50,000 rows from the second file

I will update the documentation to better reflect the actual behavior. Let me know if this answers your question.

@matthewyakubiw
Copy link
Author

Thank you for the response! That definitely makes sense.

Is there any world where we can chunk where each chuck size equals the number of rows returned on that particular parquet file? i.e. for this data I have there are roughly ~1067300 rows per parquet file (but this will vary by a few hundred to thousand.)

If not, maybe you could recommend a solution that might be helpful for my use case. I appreciate your time and help!

@LeonLuttenberger
Copy link
Contributor

Hey,

If I understand correctly, you want exactly one chunk for each file? And each of those chunks would have 1067300 rows?

If so, you could iterate through the files you have in your dataset by using wr.s3.list_objects, and then call wr.se.read_parquet for each file separately without specifying chunked=True.

Let me know if this works for you,
Leon

@matthewyakubiw
Copy link
Author

If I understand correctly, you want exactly one chunk for each file? And each of those chunks would have 1067300 rows?

Roughly that many rows, the number of rows will vary depending (data is measurements of optical hardware has an inconsistent number of entries) - but yeah one chunk for each file would be ideal.

I will take a look at your recommendation! Thank you so much for taking the time.

@matthewyakubiw
Copy link
Author

matthewyakubiw commented Mar 3, 2023

Sorry @LeonLuttenberger one more question:
Could you explain in more detail how chunked=True is more memory efficient?
For example: without chunked=True when loading 12 parquet files it's using 11.5GB of memory (each file is roughly 8MB in S3)
With chunked=True it uses roughly 600MB. I'd like to be able to utilize this feature to save memory so I can enable fitting on this data on larger quantities without having to split it up.

@LeonLuttenberger
Copy link
Contributor

Hey,

For example: without chunked=True when loading 12 parquet files it's using 11.5GB of memory (each file is roughly 8MB in S3)
With chunked=True it uses roughly 600MB. I'd like to be able to utilize this feature to save memory so I can enable fitting on this data on larger quantities without having to split it up.

Did you mean you were assigning chunked=int in one of these two cases? Both cases say chunked=True.

11.5 GB for 12 Parquet files where each file is 8MB seems excessive. Does each have ~1067300 rows in it? Can I ask how many columns are in each, and what you used to measure the memory? I'd like to try to replicate these results to better understand whats happening.

The reason chunked=True should be more memory efficient is because when it reaches the end of a file, it simply returns whatever is left before moving onto the next file. In contrast to that, chunked=int will reach the end of one file, and then begin gathering data from the next file, while attaching it to the already loaded data. The Data Frame then needs to be copied before being returned., which increases memory usage.

The main exception to the rule that chunked=True is more memory efficient is for datasets that either have lots of rows or large values in each row (e.g. long strings). In that case, chunked=True will still try to batch ~65,000 rows into a single data frame, which may exceed your desired memory usage.

@matthewyakubiw
Copy link
Author

Did you mean you were assigning chunked=int in one of these two cases? Both cases say chunked=True.

First example was without any chunk statement (absent option from the query), and the second example was with chunked=True

11.5 GB for 12 Parquet files where each file is 8MB seems excessive. Does each have ~1067300 rows in it? Can I ask how many columns are in each, and what you used to measure the memory? I'd like to try to replicate these results to better understand whats happening.

Yeah so each has roughly that many rows, with 11 columns (always 11 columns for every single parquet file). I use jupyterlab to run the notebooks so after I ran the following codeblock, my RAM usage for the kernel was at ~11.5GB:

df = [
       wr.s3.read_parquet(
           path=self.s3_parquet_url,
           dataset=True,
           partition_filter=lambda x: self.partition_filter_wrapper(x, query),
           use_threads=True,
       )
        for query in queries
]

where queries is a list of partition filters. The reason I use a list of partition filters specifying individual partitions vs one large filter is so that I can return each of the parquet files as its own dataframe as opposed to further processing the data after to separate it.
image

Thanks!

@matthewyakubiw
Copy link
Author

Here's an example of what the data
image
looks like (with fake values)

@LeonLuttenberger
Copy link
Contributor

Hey,

Which version of awswrangler are you on? The reason I ask is because we recently fixed a memory issue, which is available as of version 2.20.0.

If you're already on 2.20.0, can you please try to run the same code with use_threads=False? My thinking is that when you specify chunksize, the function foregoes any parallelization, which normally results in higher memory usage.

Cheers,
Leon

@matthewyakubiw
Copy link
Author

Hey,
So I upgraded to 2.20.0 and tried running with use_threads equal to both False and True, and the same amount of memory was used:
image
Can I do anything else to provide you with more context that might help?

@matthewyakubiw
Copy link
Author

Is there any chance this is an M1 thing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants