-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_parquet.py
56 lines (46 loc) · 1.42 KB
/
create_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3
import sys
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv
csv_file = sys.argv[1]
parquet_file = sys.argv[2]
print("reading header...")
with open(csv_file, "r") as fd:
header = fd.readline().rstrip().split("\t")
print("done!")
non_variant_mac_columns = {"FID", "IID", "PAT", "MAT", "SEX", "PHENOTYPE"}
types_dict = {
**{col: pa.utf8() for col in non_variant_mac_columns},
**{col: pa.int8() for col in header if col not in non_variant_mac_columns},
}
read_options = pyarrow.csv.ReadOptions()
read_options.use_threads = True
read_options.column_names = header
read_options.block_size = 2**30
read_options.skip_rows = 1
parse_options = pyarrow.csv.ParseOptions()
parse_options.delimiter = "\t"
convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = types_dict
#chunksize = 100000
writer = None
with pyarrow.csv.open_csv(
csv_file,
read_options=read_options,
parse_options=parse_options,
convert_options=convert_options,
) as reader:
for i, next_chunk in enumerate(reader):
print(f"Chunk {i}")
if next_chunk is None:
break
if writer is None:
writer = pq.ParquetWriter(parquet_file, next_chunk.schema)
next_table = pa.Table.from_batches([next_chunk])
writer.write_table(next_table)
writer.close()
print("Done!")