-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiprocessor.py
124 lines (104 loc) · 4.66 KB
/
multiprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import urllib3
import datetime
from os import path
import os
import threading
import json
from multiprocessing.pool import ThreadPool
from bs4 import BeautifulSoup
import requests
import sys
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class MultiProcessor:
def __init__(self, processes=20, chunk_size=100):
self._thread_pool = ThreadPool(processes=processes)
self._chunk_size = chunk_size
# temp
self._local_dir = ""
self._url_func = lambda x: x
self._target_suffix = ""
self._process_func = lambda x: x
self._reprocess = False
self._need_redirect = False
def scrape_all(self, local_dir, target_names, url_func, early_stop_func=None, target_suffix="html", need_redirect=False):
self._url_func = url_func
self._local_dir = local_dir
self._target_suffix = target_suffix
self._need_redirect = need_redirect
if not path.isdir(self._local_dir):
os.makedirs(self._local_dir)
max_i = -1
if early_stop_func is not None:
self.scrape_one(target_names[0])
if not path.isfile(self.get_target_path(target_names[0])):
return
with open(self.get_target_path(target_names[0]), "r", encoding="utf-8") as first_page_file:
max_i = early_stop_func(first_page_file.read())
first_page_file.close()
assert max_i <= 1000
if max_i == 0:
return
for i, _ in enumerate(self._thread_pool.imap(self.scrape_one, target_names[:max_i], chunksize=self._chunk_size)):
if divmod(i, 1000)[1] == 0:
print("#{}: {}".format(i, datetime.datetime.now()))
def get_target_path(self, target_name):
return path.join(self._local_dir, "{}.{}".format(target_name, self._target_suffix))
def scrape_one(self, target_name):
if path.isfile(self.get_target_path(target_name)):
return target_name
s = requests.Session()
target_url = self._url_func(target_name)
if target_url is None:
print("Can't find source link for pubmed doc id {}".format(target_name))
return target_name
try:
if self._need_redirect:
target_url = self.expand_url(target_url)
r = s.get(target_url, allow_redirects=1, verify=False)
assert r.status_code == 200
except:
exc_type, e, trace = sys.exc_info()
print("met {}({}) in request for pubmed doc {}".format(str(exc_type), str(e), target_name))
return target_name
with open(self.get_target_path(target_name), "wb") as html_file:
html_file.write(r.content)
html_file.close()
return target_name
def process_one(self, target_name):
output_path = path.join(self._local_dir, "{}.{}".format(target_name, self._target_suffix))
if self._reprocess is False and path.isfile(output_path):
return
obj = self._process_func(target_name)
with open(output_path, "w", encoding="utf-8") as outfile:
json.dump(obj, outfile, indent=2)
outfile.close()
def process_all(self, target_names, process_func, output_dir, target_suffix="json"):
self._process_func = process_func
self._local_dir = output_dir
self._target_suffix = target_suffix
if not path.isdir(self._local_dir):
os.makedirs(self._local_dir)
if len(target_names) is not 0:
existed_first_output_path = path.join(self._local_dir, "{}.{}".format(target_names[0], self._target_suffix))
if path.isfile(existed_first_output_path):
with open(existed_first_output_path, "r", encoding="utf-8") as cache_file:
old = cache_file.read()
cache_file.close()
self._reprocess = True
self.process_one(target_name=target_names[0])
with open(existed_first_output_path, "r", encoding="utf-8") as cache_file:
new = cache_file.read()
cache_file.close()
self._reprocess = (old != new)
for i, _ in enumerate(
self._thread_pool.imap(self.process_one, target_names, chunksize=self._chunk_size)):
if divmod(i, 1000)[1] == 0:
print("#{}: {}".format(i, datetime.datetime.now()))
@classmethod
def expand_url(cls, url):
s = requests.Session()
try:
r = s.get(url.rstrip(), allow_redirects=1, verify=False)
return r.url.rstrip()
except requests.exceptions.ConnectionError as e:
print(e)