-
-
Notifications
You must be signed in to change notification settings - Fork 12
/
get_package_list.py
117 lines (93 loc) · 2.91 KB
/
get_package_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import contextlib
import random
import sys
from datetime import timedelta
import joblib
import requests
from joblib import Parallel, delayed
from requests_cache import CachedSession
from tqdm import tqdm
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
"""Context manager to patch joblib to report into tqdm progress bar given as argument"""
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
def checkGame(game):
session = CachedSession(
'steam_cache',
use_cache_dir=True,
cache_control=False,
allowable_methods=['GET'],
allowable_codes=[200],
match_headers=False,
stale_if_error=False,
)
try:
res = session.get(
url='https://store.steampowered.com/api/appdetails/?appids=' +
str(game) + '&cc=EE&l=english&v=1',
proxies={
'http': 'socks5h://p.webshare.io:9999',
'https': 'socks5h://p.webshare.io:9999'
},
expire_after=timedelta(hours=random.randint(20, 24)))
except Exception as e:
print('\nGot exception while trying to send request %s' % e)
return checkGame(game)
if res.status_code != 200:
print('\nGot wrong status code %d' % res.status_code)
return checkGame(game)
try:
res = res.json()
except Exception as e:
# print('\nGot exception while trying to decode JSON %s' % e)
return None
if res is None:
print('\nGot invalid response %d' % game)
return checkGame(game)
if res[str(game)]['success'] is False:
# print('\nGot invalid app %d' % game)
return None
if res[str(game)]['data']['release_date']['coming_soon'] is True:
# print('\nGot not yet released app %d' % game)
return None
if res[str(game)]['data']['is_free'] is True:
# print('\nGot free app %d' % game)
return game
if res[str(game)]['data']['is_free'] is False:
# print('\nGot paid app %d' % game)
return None
print('Got faulty response %s' % res)
return None
res = requests.get(
url='http://api.steampowered.com/ISteamApps/GetAppList/v2').json()
apps = []
for app in res['applist']['apps']:
apps.append(app['appid'])
random.shuffle(apps)
print('Received %d apps' % len(apps))
with tqdm_joblib(
tqdm(desc='Requesting app statuses', total=len(apps),
mininterval=10)) as progress_bar:
results = Parallel(n_jobs=20)(delayed(checkGame)(i) for i in apps)
output = ''
for game in results:
if game is not None:
output += str(game) + ','
output = output[:-1]
with open('package_list.txt', 'w') as f:
f.write(output)
if len(output) > 10000:
print("\nRan successfully")
else:
print("\nSomething went wrong")
sys.exit(1)