-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapple_appstore.py
184 lines (141 loc) · 5.83 KB
/
apple_appstore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Provides tasks for downloading all Apple App Store reviews about the app."""
import datetime as dt
import json
import random
from time import sleep
import luigi
from luigi.format import UTF8
import pandas as pd
import requests
import xmltodict
from _utils import CsvToDb, DataPreparationTask, MuseumFacts, logger
class AppstoreReviewsToDb(CsvToDb):
"""Store all download App Store reviews into the database."""
table = 'appstore_review'
def requires(self):
return FetchAppstoreReviews()
class FetchAppstoreReviews(DataPreparationTask):
"""
Download all reviews related to the museum app from the Apple App Store.
The data is accessed by scanning an RSS feed.
"""
table = 'appstore_review'
requests_per_minute = 20
worker_timeout = 1200
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.latest_request_time = dt.datetime.min
def requires(self):
return MuseumFacts()
def output(self):
return luigi.LocalTarget(
f'{self.output_dir}/appstore_reviews.csv', format=UTF8)
def run(self):
reviews = self.fetch_all()
logger.info("storing results")
with self.output().open('w') as output_file:
reviews.to_csv(output_file, index=False, header=True)
def fetch_all(self):
data = []
country_codes = sorted(self.get_country_codes())
if self.minimal_mode:
random_num = random.randint(0, len(country_codes) - 2) # nosec
country_codes = country_codes[random_num:random_num + 2]
country_codes.append('CA')
tbar = self.tqdm(country_codes, desc="Fetching appstore reviews")
for country_code in tbar:
tbar.set_description(
f"Fetching appstore reviews ({country_code})"
)
try:
data_for_country = self.fetch_for_country(country_code)
if not data_for_country.empty:
data.append(data_for_country)
logger.debug(f'Fetching appstore reviews for {country_code}')
except requests.HTTPError as error:
if error.response.status_code == 400:
# not all countries are available
pass
else:
raise
try:
ret = pd.concat(data)
except ValueError:
ret = pd.DataFrame(columns=[])
return ret.drop_duplicates(subset=['app_id', 'appstore_review_id'])
def get_country_codes(self):
return requests.get('http://country.io/names.json').json().keys()
def fetch_for_country(self, country_code):
with self.input().open('r') as facts_file:
facts = json.load(facts_file)
app_id = facts['ids']['apple']['appId']
url = (f'https://itunes.apple.com/{country_code}/rss/customerreviews/'
f'page=1/id={app_id}/sortby=mostrecent/xml')
data_list = []
while url:
try:
data, url = self.fetch_page(url)
data_list += data
except requests.exceptions.HTTPError as error:
if error.response is not None and (
error.response.status_code == 503 or (
error.response.status_code in {403, 404}
and country_code not in {'DE', 'US', 'GB'})):
logger.error(f"Encountered {error.response.status_code} "
f"server error '{error}' for country code "
f"'{country_code}'")
logger.error("Continuing anyway...")
break
else:
raise
if not data_list:
# no reviews for the given country code
logger.debug(f"Empty data for country {country_code}")
result = pd.DataFrame(data_list)
result['country_code'] = country_code
result.insert(0, 'app_id', app_id)
return result
def fetch_page(self, url):
response = self.get_metered_request(url)
response.raise_for_status()
# specify encoding explicitly because the autodetection fails sometimes
response.encoding = 'utf-8'
response_content = xmltodict.parse(response.text)['feed']
if 'entry' not in response_content:
return [], None
entries = response_content['entry']
if isinstance(entries, dict):
entries = [entries]
data = [
{
'appstore_review_id': item['id'],
'text': self.find_first_conditional_tag(
item['content'],
lambda each: each['@type'] == 'text')['#text'],
'rating': item['im:rating'],
'app_version': item['im:version'],
'vote_count': item['im:voteCount'],
'vote_sum': item['im:voteSum'],
'title': item['title'],
'date': item['updated']
}
for item in entries
]
# read <link rel="next"> which contains the link to the next page
next_page_url = self.find_first_conditional_tag(
response_content['link'],
lambda each: each['@rel'] == 'next')['@href']
return data, next_page_url
# for when there are multiple 'contents'-elements in our response
def find_first_conditional_tag(self, tags, condition):
return next(each for each in tags if condition(each))
def get_metered_request(self, *args, **kwargs):
sleep(max(0, (
60 / self.requests_per_minute - (
dt.datetime.now() - self.latest_request_time
).total_seconds())
))
try:
return requests.get(*args, **kwargs)
finally:
self.latest_request_time = dt.datetime.now()