Skip to content
This repository has been archived by the owner on Jan 6, 2025. It is now read-only.

Commit

Permalink
🛠 Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
shamhi committed Jul 20, 2024
1 parent 140315b commit ba2da07
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 105 deletions.
17 changes: 11 additions & 6 deletions bot/core/tapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def get_auth_url(self, proxy: str | None) -> str:
bot=peer,
platform='android',
from_bot_menu=False,
url='https://app.tapswap.ai/'
url='https://app.tapswap.club/'
))

auth_url = web_view.url.replace('tgWebAppVersion=6.7', 'tgWebAppVersion=7.2')
Expand Down Expand Up @@ -111,7 +111,7 @@ async def login(self, http_client: aiohttp.ClientSession, auth_url: str, proxy:
async def apply_boost(self, http_client: aiohttp.ClientSession, boost_type: str) -> bool:
response_text = ''
try:
response = await http_client.post(url='https://api.tapswap.ai/api/player/apply_boost',
response = await http_client.post(url='https://api.tapswap.club/api/player/apply_boost',
json={'type': boost_type})
response_text = await response.text()
response.raise_for_status()
Expand All @@ -127,7 +127,7 @@ async def apply_boost(self, http_client: aiohttp.ClientSession, boost_type: str)
async def upgrade_boost(self, http_client: aiohttp.ClientSession, boost_type: str) -> bool:
response_text = ''
try:
response = await http_client.post(url='https://api.tapswap.ai/api/player/upgrade',
response = await http_client.post(url='https://api.tapswap.club/api/player/upgrade',
json={'type': boost_type})
response_text = await response.text()
response.raise_for_status()
Expand All @@ -143,7 +143,7 @@ async def upgrade_boost(self, http_client: aiohttp.ClientSession, boost_type: st
async def claim_reward(self, http_client: aiohttp.ClientSession, task_id: str) -> bool:
response_text = ''
try:
response = await http_client.post(url='https://api.tapswap.ai/api/player/claim_reward',
response = await http_client.post(url='https://api.tapswap.club/api/player/claim_reward',
json={'task_id': task_id})
response_text = await response.text()
response.raise_for_status()
Expand All @@ -166,7 +166,7 @@ async def send_taps(self, http_client: aiohttp.ClientSession, taps: int) -> dict

http_client.headers['Content-Id'] = str(content_id)

response = await http_client.post(url='https://api.tapswap.ai/api/player/submit_taps', json=json_data)
response = await http_client.post(url='https://api.tapswap.club/api/player/submit_taps', json=json_data)
response_text = await response.text()
response.raise_for_status()

Expand Down Expand Up @@ -201,6 +201,9 @@ async def run(self, proxy: str | None) -> None:

auth_url = await self.get_auth_url(proxy=proxy)

if not auth_url:
return

while True:
try:
if http_client.closed:
Expand All @@ -212,7 +215,9 @@ async def run(self, proxy: str | None) -> None:
http_client = aiohttp.ClientSession(headers=headers, connector=proxy_conn)

if time() - access_token_created_time >= 1800:
profile_data, access_token = await self.login(http_client=http_client, auth_url=auth_url, proxy=proxy)
profile_data, access_token = await self.login(http_client=http_client,
auth_url=auth_url,
proxy=proxy)

if not access_token:
continue
Expand Down
179 changes: 80 additions & 99 deletions bot/utils/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import asyncio
import pathlib
from typing import Union
from contextlib import contextmanager

from pyrogram import Client
from pyrogram.types import Message
Expand All @@ -24,10 +25,7 @@


def get_session_names() -> list[str]:
session_names = glob.glob("sessions/*.session")
session_names = [
os.path.splitext(os.path.basename(file))[0] for file in session_names
]
session_names = [os.path.splitext(os.path.basename(file))[0] for file in glob.glob("sessions/*.session")]

return session_names

Expand Down Expand Up @@ -133,75 +131,67 @@ def escape_html(text: str) -> str:

options.add_argument("--headless")
options.add_argument("--log-level=3")
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = None
if os.name == 'posix':
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')

session_queue = Queue()

@contextmanager
def create_webdriver():
driver = web_driver(service=web_service(webdriver_path), options=options)
try:
yield driver
finally:
driver.quit()


def extract_chq(chq: str) -> int:
global driver

if driver is None:
driver = web_driver(service=web_service(webdriver_path), options=options)

chq_length = len(chq)

bytes_array = bytearray(chq_length // 2)
xor_key = 157

for i in range(0, chq_length, 2):
bytes_array[i // 2] = int(chq[i:i + 2], 16)

xor_bytes = bytearray(t ^ xor_key for t in bytes_array)
decoded_xor = xor_bytes.decode('utf-8')

driver.execute_script("""
window.ctx = {}
window.ctx.api = {}
window.ctx.d_headers = new Map()
window.ctx.api.setHeaders = function(entries) { for (const [W, U] of Object.entries(entries)) window.ctx.d_headers.set(W, U) }
var chrStub = document.createElement("div");
chrStub.id = "_chr_";
document.body.appendChild(chrStub);
""")

fixed_xor = repr(decoded_xor).replace("`", "\\`")

chr_key = driver.execute_script(f"""
try {{
return eval(`{fixed_xor[1:-1]}`);
}} catch (e) {{
return e;
}}
""")

cache_id = driver.execute_script(f"""
try {{
return window.ctx.d_headers.get('Cache-Id');
}} catch (e) {{
return e;
}}
""")

session_queue.put(1)

if len(get_session_names()) == session_queue.qsize():
logger.info("All sessions are closed. Quitting driver...")
driver.quit()
driver = None
while session_queue.qsize() > 0:
session_queue.get()
with create_webdriver() as driver:
chq_length = len(chq)

bytes_array = bytearray(chq_length // 2)
xor_key = 157

for i in range(0, chq_length, 2):
bytes_array[i // 2] = int(chq[i:i + 2], 16)

xor_bytes = bytearray(t ^ xor_key for t in bytes_array)
decoded_xor = xor_bytes.decode('utf-8')

driver.execute_script("""
window.ctx = {}
window.ctx.api = {}
window.ctx.d_headers = new Map()
window.ctx.api.setHeaders = function(entries) { for (const [W, U] of Object.entries(entries)) window.ctx.d_headers.set(W, U) }
var chrStub = document.createElement("div");
chrStub.id = "_chr_";
document.body.appendChild(chrStub);
""")

fixed_xor = repr(decoded_xor).replace("`", "\\`")

chr_key = driver.execute_script(f"""
try {{
return eval(`{fixed_xor[1:-1]}`);
}} catch (e) {{
return e;
}}
""")

cache_id = driver.execute_script(f"""
try {{
return window.ctx.d_headers.get('Cache-Id');
}} catch (e) {{
return e;
}}
""")

return chr_key, cache_id


# Other way
def login_in_browser(auth_url: str, proxy: str) -> tuple[str, str, str]:
global driver

if driver is None:
with create_webdriver() as driver:
if proxy:
proxy_options = {
'proxy': {
Expand All @@ -214,48 +204,39 @@ def login_in_browser(auth_url: str, proxy: str) -> tuple[str, str, str]:

driver = web_driver(service=web_service(webdriver_path), options=options, seleniumwire_options=proxy_options)

driver.get(auth_url)

time.sleep(random.randint(7, 15))

try:
skip_button = driver.find_element(By.XPATH, '//*[@id="app"]/div[2]/button')
if skip_button:
skip_button.click()
time.sleep(random.randint(2, 5))
except:
...
driver.get(auth_url)

try:
coin = driver.find_element(By.XPATH, '//*[@id="ex1-layer"]')
if coin:
coin.click()
except:
...
time.sleep(random.randint(7, 15))

time.sleep(5)
try:
skip_button = driver.find_element(By.XPATH, '//*[@id="app"]/div[2]/button')
if skip_button:
skip_button.click()
time.sleep(random.randint(2, 5))
except:
...

response_text = '{}'
x_cv = '631'
x_touch = '1'
try:
coin = driver.find_element(By.XPATH, '//*[@id="ex1-layer"]')
if coin:
coin.click()
except:
...

for request in driver.requests:
request_body = request.body.decode('utf-8')
if request.url == "https://api.tapswap.ai/api/account/login" and 'chr' in request_body:
response_text = request.response.body.decode('utf-8')
time.sleep(5)

if request.url == "https://api.tapswap.ai/api/player/submit_taps":
headers = dict(request.headers.items())
x_cv = headers.get('X-Cv') or headers.get('x-cv')
x_touch = headers.get('X-Touch', '') or headers.get('x-touch', '')
response_text = '{}'
x_cv = '651'
x_touch = '1'

session_queue.put(1)
for request in driver.requests:
request_body = request.body.decode('utf-8')
if request.url == "https://api.tapswap.club/api/account/challenge" and 'chr' in request_body:
response_text = request.response.body.decode('utf-8')

if len(get_session_names()) == session_queue.qsize():
logger.info("All sessions are closed. Quitting driver...")
driver.quit()
driver = None
while session_queue.qsize() > 0:
session_queue.get()
if request.url == "https://api.tapswap.club/api/player/submit_taps":
headers = dict(request.headers.items())
x_cv = headers.get('X-Cv') or headers.get('x-cv')
x_touch = headers.get('X-Touch', '') or headers.get('x-touch', '')

return response_text, x_cv, x_touch

0 comments on commit ba2da07

Please sign in to comment.