This repository has been archived by the owner on Mar 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
main.py
233 lines (177 loc) · 8.01 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from support import functions as f
from support import ui_elements as ui
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import glob
import os
import csv
import json
import requests
import time
# Load config parameters
with open('config.json') as config:
CONFIG = json.load(config)
# Create url to get all profiles, get all profiles
list_url = '{}:{}/api/v1/user/list?page_size={}'.format(CONFIG['ADSPOWER_URL'],
CONFIG['ADSPOWER_PORT'],
CONFIG['PAGE_SIZE'],)
resp_list = requests.get(list_url).json()
# Get profiles which already done
list_of_files = os.listdir(CONFIG['FOLDER_FOR_KEYS'])
list_of_names = [filename.split('.')[0] for filename in list_of_files]
# ---------------------------------------------------------------------------------------
# Start processing each profile
for profile in resp_list['data']['list']:
# Check if the profile has been processed before
if profile['user_id'] in list_of_names:
print('{} already processed'.format(profile['user_id']))
continue
# TODO: add empty response and exception handling
# TODO: add exception handling (logs)
CURRENT_RECORD = []
open_url = '{}:{}/api/v1/browser/start?user_id={}'.format(CONFIG['ADSPOWER_URL'],
CONFIG['ADSPOWER_PORT'],
profile['user_id'])
close_url = '{}:{}/api/v1/browser/stop?user_id={}'.format(CONFIG['ADSPOWER_URL'],
CONFIG['ADSPOWER_PORT'],
profile['user_id'])
resp = requests.get(open_url).json()
CURRENT_RECORD.append(profile['user_id'])
# Set up selenium
chrome_driver = resp["data"]["webdriver"]
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", resp["data"]["ws"]["selenium"])
driver = webdriver.Chrome(chrome_driver, options=chrome_options)
# ---------------------------------------------------------------------------------------
# Set up Metamask
# 0 - Create wallet
# 1 - Login
driver.get(CONFIG['METAMASK_URL'])
# Close other tabs
f.close_other_tabs(driver)
# Try to create wallet
driver.get(CONFIG['METAMASK_URL'])
try:
flow_flag = f.check_element(driver, ui.xpath_getstarted)
if flow_flag:
# GetStarted element already defined
f.click_element(driver, ui.xpath_getstarted)
# Click Create button
f.click_element(driver, ui.xpath_create1)
# Click No, thanks button
f.click_element(driver, ui.xpath_nothanks)
# Enter a new password
f.sendkeys_element(driver, ui.xpath_newpass, CONFIG['DEFAULT_PASS'])
# Enter a new password once again (confirm)
f.sendkeys_element(driver, ui.xpath_confirmpass, CONFIG['DEFAULT_PASS'])
# Click checkbox I have read terms
f.click_element(driver, ui.xpath_ihaveread)
# Click Create button
f.click_element(driver, ui.xpath_create2)
# Click Next button
f.click_element(driver, ui.xpath_mmnext)
# Download secret key (seed)
f.click_element(driver, ui.xpath_download)
# Click Remind me later (do not reveal seed)
f.click_element(driver, ui.xpath_later)
# Rename a downloaded file (wit seed)
search_pattern = CONFIG['DOWNLOADS_FOLDER'] + '/*.txt'
list_of_files = glob.glob(search_pattern)
latest_file = max(list_of_files, key=os.path.getctime)
# Absolute path to the file
new_name = CONFIG['FOLDER_FOR_KEYS'] + '/' + profile['user_id'] + '.txt'
os.rename(latest_file, new_name)
except Exception as err:
print(err.args)
time.sleep(CONFIG['GLOBAL_SLEEP'])
pass
# Login (if required)
try:
flow_flag = f.check_element(driver, ui.xpath_password)
if flow_flag:
f.sendkeys_element(driver, ui.xpath_password, CONFIG['DEFAULT_PASS'])
f.sendkeys_element(driver, ui.xpath_password, Keys.RETURN)
except Exception as err:
print(err.args)
time.sleep(CONFIG['GLOBAL_SLEEP'])
pass
# Launch DEX (Refer & Earn section)
driver.maximize_window()
driver.get(CONFIG['DEX_URL'])
# ---------------------------------------------------------------------------------------
# Connect wallet (if not connected) and select Metamask
try:
flow_flag = f.check_element(driver, ui.xpath_connect)
if flow_flag:
f.click_element(driver, ui.xpath_connect)
# Select Metamask
flow_flag = f.check_element(driver, ui.xpath_mmbutton)
f.click_element(driver, ui.xpath_mmbutton) if flow_flag else None
except Exception as err:
print(err.args)
time.sleep(CONFIG['GLOBAL_SLEEP'])
pass
# ---------------------------------------------------------------------------------------
# Switch to Polygon (if not already switched)
try:
flow_flag = f.check_element(driver, ui.xpath_switch)
if flow_flag:
f.click_element(driver, ui.xpath_switch)
# Go to metamask html page (we can't work with Metamask unless we open its popup as a new tab)
driver.get(CONFIG['METAMASK_URL'])
# Approve adding a network (this popup appears if Polygon not saved in Metamask)
# (press Approve button)
flow_flag = f.check_element(driver, ui.xpath_approve)
f.click_element(driver, ui.xpath_approve) if flow_flag else None
# Switch to Polygon (press Switch network button)
f.click_element(driver, ui.xpath_switchmm)
except Exception as err:
print(err.args)
time.sleep(CONFIG['GLOBAL_SLEEP'])
pass
# ---------------------------------------------------------------------------------------
# Go back to Polysynth and enter a referral code
# TODO: replace with smart wait
driver.get(CONFIG['DEX_URL'])
# Connect wallet (if not connected) and select Metamask
try:
flow_flag = f.check_element(driver, ui.xpath_connect)
if flow_flag:
# Click Connect button
f.click_element(driver, ui.xpath_connect)
# Select Metamask in pop-up
flow_flag = f.check_element(driver, ui.xpath_mmbutton)
f.click_element(driver, ui.xpath_mmbutton) if flow_flag else None
# Select account
driver.get(CONFIG['METAMASK_URL'])
flow_flag = f.check_element(driver, ui.xpath_nextbutton)
if flow_flag:
# Click Next button (approve connection to accounts picked by default)
f.click_element(driver, ui.xpath_nextbutton)
# Click Connect
f.click_element(driver, ui.xpath_mmconnect)
except Exception as err:
print(err.args)
time.sleep(CONFIG['GLOBAL_SLEEP'])
pass
# Click the link
driver.get(CONFIG['DEX_URL'])
f.click_element(driver, ui.xpath_reflink)
# Enter the referral code
f.sendkeys_element(driver, ui.xpath_refcode, CONFIG['REF_CODE'])
# Submit
# TODO: save a screenshot
flow_flag = f.check_element(driver, ui.xpath_submit)
f.click_element(driver, ui.xpath_submit) if flow_flag else None
# TODO: check entered value and errors
elem_yourrefcode = driver.find_element(By.XPATH, ui.xpath_yourrefcode)
# Save public address
CURRENT_RECORD.append(elem_yourrefcode.text)
# Done! Quit and go to the next one
driver.quit()
requests.get(close_url)
with open('processed_profiles.csv', 'a', newline="") as file:
writer = csv.writer(file)
writer.writerow(CURRENT_RECORD)