-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathduck.py
183 lines (153 loc) · 6.35 KB
/
duck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import time
import glob
import json
import os.path
import subprocess
import sys
import gzip
import duckdb
def index_download_advice(prefix, crawl):
print('Do you need to download this index?')
print(f' mkdir -p {prefix}/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/')
print(f' cd {prefix}/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/')
print(f' aws s3 sync s3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/ .')
def print_row_as_cdxj(row):
df = row.fetchdf()
for ro in df.itertuples(index=False):
d = ro._asdict()
cdxjd = {
'url': d['url'],
'mime': d['content_mime_type'],
'status': str(d['fetch_status']),
'digest': 'sha1:' + d['content_digest'],
'length': str(d['warc_record_length']),
'offset': str(d['warc_record_offset']),
'filename': d['warc_filename'],
}
timestamp = d['fetch_time'].isoformat(sep='T')
timestamp = timestamp.translate(str.maketrans('', '', '-T :Z')).replace('+0000', '')
print(d['url_surtkey'], timestamp, json.dumps(cdxjd))
def print_row_as_kv_list(row):
df = row.fetchdf()
for ro in df.itertuples(index=False):
d = ro._asdict()
for k, v in d.items():
print(' ', k, v)
all_algos = ('s3_glob', 'local_files', 'ccf_local_files', 'cloudfront_glob', 'cloudfront')
def get_files(algo, crawl):
if algo == 's3_glob':
# 403 errors with and without credentials. you have to be commoncrawl-pds
files = f's3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet'
raise NotImplementedError('will cause a 403')
elif algo == 'local_files':
files = os.path.expanduser(f'~/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet')
files = glob.glob(files)
# did we already download? we expect 300 files of about a gigabyte
if len(files) < 250:
index_download_advice('~', crawl)
exit(1)
elif algo == 'ccf_local_files':
files = glob.glob(f'/home/cc-pds/commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet')
if len(files) < 250:
index_download_advice('/home/cc-pds', crawl)
exit(1)
elif algo == 'cloudfront_glob':
# duckdb can't glob this, same reason as s3_glob above
files = f'https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet'
raise NotImplementedError('duckdb will throw an error because it cannot glob this')
elif algo == 'cloudfront':
prefix = f's3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/'
external_prefix = f'https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/'
file_file = f'{crawl}.warc.paths.gz'
with gzip.open(file_file, mode='rt', encoding='utf8') as fd:
files = fd.read().splitlines()
files = [external_prefix+f for f in files]
else:
raise NotImplementedError('algo: '+algo)
return files
def main(algo, crawl):
files = get_files(algo, crawl)
retries_left = 100
while True:
try:
ccindex = duckdb.read_parquet(files, hive_partitioning=True)
break
except (duckdb.HTTPException, duckdb.InvalidInputException) as e:
# read_parquet exception seen: HTTPException("HTTP Error: HTTP GET error on 'https://...' (HTTP 403)")
# duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
print('read_parquet exception seen:', repr(e), file=sys.stderr)
if retries_left:
print('sleeping for 60s', file=sys.stderr)
time.sleep(60)
retries_left -= 1
else:
raise
duckdb.sql('SET enable_progress_bar = true;')
duckdb.sql('SET http_retries = 100;')
#duckdb.sql("SET enable_http_logging = true;SET http_logging_output = 'duck.http.log'")
print('total records for crawl:', crawl)
retries_left = 100
while True:
try:
print(duckdb.sql('SELECT COUNT(*) FROM ccindex;'))
break
except duckdb.InvalidInputException as e:
# duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
print('duckdb exception seen:', repr(e), file=sys.stderr)
if retries_left:
print('sleeping for 10s', file=sys.stderr)
time.sleep(10)
retries_left -= 1
else:
raise
sq2 = f'''
select
*
from ccindex
where subset = 'warc'
and crawl = 'CC-MAIN-2024-22'
and url_host_tld = 'org' -- help the query optimizer
and url_host_registered_domain = 'wikipedia.org' -- ditto
and url = 'https://an.wikipedia.org/wiki/Escopete'
;
'''
row2 = duckdb.sql(sq2)
print('our one row')
while True:
try:
row2.show()
break
except duckdb.InvalidInputException as e:
# duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
print('duckdb exception seen:', repr(e), file=sys.stderr)
if retries_left:
print('sleeping for 10s', file=sys.stderr)
time.sleep(10)
retries_left -= 1
else:
raise
print('writing our one row to a local parquet file, whirlwind.parquet')
row2.write_parquet('whirlwind.parquet')
cclocal = duckdb.read_parquet('whirlwind.parquet')
print('total records for local whirlwind.parquet should be 1')
print(duckdb.sql('SELECT COUNT(*) FROM cclocal;'))
sq3 = sq2.replace('ccindex', 'cclocal')
row3 = duckdb.sql(sq3)
print('our one row, locally')
row3.show()
print('complete row:')
print_row_as_kv_list(row3)
print('')
print('equivalent to cdxj:')
print_row_as_cdxj(row3)
if __name__ == '__main__':
crawl = 'CC-MAIN-2024-22'
if len(sys.argv) > 1:
algo = sys.argv[1]
if algo == 'help':
print('possible algos:', all_algos)
exit(1)
else:
algo = 'cloudfront'
print('using algo: ', algo)
main(algo, crawl)