-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbenchmark_asyncload.py
113 lines (91 loc) · 4.14 KB
/
benchmark_asyncload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
This script is used to benchmark the performance of async load and sync load.
To simulate the real-world scenario, we add a delay to the data loading and training process.
"""
import time
from pyinstrument.profiler import Profiler as time_profiler
from parquet_loader import ParquetDataset, ParquetDataLoader
from parquet_loader.reader import AsyncParquetReader, SyncParquetReader
path = '../synthetic_data'
delay_in_seconds = 0.01
class AsyncParquetReaderWithDelays(AsyncParquetReader):
def _preload(self, metas, intervals, queue):
try:
for fi, itvs in intervals.items():
with self._open_parquet_file(metas[fi].file_path) as pf:
for itv in itvs:
offset = itv.local_row_end - itv.local_row_start
table = pf.read_row_group(itv.row_group_index).slice(itv.local_row_start, offset)
time.sleep(delay_in_seconds)
queue.put(table, block=True)
except Exception as e:
queue.put(e)
finally:
queue.put(self._END_TOKEN)
class SyncParquetReaderWithDelays(SyncParquetReader):
@property
def table_iterator(self):
for fi, itvs in self.intervals.items():
with self._open_parquet_file(self.metas[fi].file_path) as pf:
for itv in itvs:
offset = itv.local_row_end - itv.local_row_start
time.sleep(delay_in_seconds)
yield pf.read_row_group(itv.row_group_index).slice(itv.local_row_start, offset)
class ParquetDatasetWithDelays(ParquetDataset):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.reader = AsyncParquetReaderWithDelays(self.max_preload) \
if self.async_read and self.max_preload > 0 else \
SyncParquetReaderWithDelays()
def sync_load():
prof = time_profiler()
prof.start()
dataset = ParquetDatasetWithDelays(path)
dataloader = ParquetDataLoader(dataset, batch_size=66, shuffle=False)
for i, batch in enumerate(dataloader):
time.sleep(delay_in_seconds)
# print(i, batch.shape)
pass
prof.stop()
print(f'sync load: {prof.output_text(unicode=True, color=True)}')
def async_load():
prof = time_profiler()
prof.start()
dataset = ParquetDatasetWithDelays(path, async_read=True)
dataloader = ParquetDataLoader(dataset, batch_size=66, shuffle=False)
for i, batch in enumerate(dataloader):
time.sleep(delay_in_seconds)
# print(i, batch.shape)
pass
prof.stop()
print(f'async load: {prof.output_text(unicode=True, color=True)}')
if __name__ == '__main__':
sync_load()
async_load()
#######################
### Result on my PC ###
#######################
"""
sync load:
_ ._ __/__ _ _ _ _ _/_ Recorded: 03:49:10 Samples: 11961
/_//_/// /_\ / //_// / //_'/ // Duration: 39.205 CPU time: 15.765
/ _/ v4.6.2
Program: /home/hanhui/codes/ParquetLoader/benchmarks/benchmark_asyncload.py
39.204 sync_load benchmark_asyncload.py:50
├─ 22.411 _SingleProcessDataLoaderIter.__next__ torch/utils/data/dataloader.py:625
│ [22 frames hidden] torch, parquet_loader, pyarrow, pandas
│ 21.974 ParquetDatasetWith01sDelay.iter_batch parquet_loader/dataset.py:107
│ ├─ 13.900 SyncParquetReaderWith01sDelay.table_iterator benchmark_asyncload.py:32
│ │ ├─ 10.947 sleep <built-in>
│ │ └─ 2.883 ParquetFile.read_row_group pyarrow/parquet/core.py:423
└─ 16.692 sleep <built-in>
async load:
_ ._ __/__ _ _ _ _ _/_ Recorded: 03:49:52 Samples: 10615
/_//_/// /_\ / //_// / //_'/ // Duration: 25.855 CPU time: 15.253
/ _/ v4.6.2
Program: /home/hanhui/codes/ParquetLoader/benchmarks/benchmark_asyncload.py
25.854 async_load benchmark_asyncload.py:63
├─ 16.503 sleep <built-in>
└─ 9.269 _SingleProcessDataLoaderIter.__next__ torch/utils/data/dataloader.py:625
[31 frames hidden] torch, parquet_loader, pyarrow, pandas
"""