-
Notifications
You must be signed in to change notification settings - Fork 1
/
ajax.py
134 lines (111 loc) · 4.31 KB
/
ajax.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
from bs4 import BeautifulSoup
from megadebrid.libs.base import MegaDebrid
class MegaDebridAjax(MegaDebrid):
"""
Mega-Debrid AJAX: provide the methods to perform similar actions than AJAX backend.
"""
def __init__(self) -> MegaDebrid:
super().__init__(is_ajax=True)
self.is_authenticated()
@property
def ajax_url(self) -> str:
return f"{ self.base_url }/"
def is_authenticated(self):
if not self.session.headers.get("USER-AGENT"):
raise Exception(
"Require the same USER-AGENT that was used for authentication."
)
cookies = {cookie.key: cookie.value for cookie in self.session.cookie_jar}
if not cookies.get("PHPSESSID"):
raise Exception(
"Could not authenticate on Mega-Debrid.eu: require to have PHPSESSID define."
)
async def get_torrents_list(self) -> dict:
"""
GET the JSON list of my torrents in the seedbox
on https://www.mega-debrid.eu/index.php?page=torrents
"""
params = {"ajax": "getMyTorrents"}
async with self.session.get(self.base_url, params=params) as response:
return await response.json(content_type="text/html")
async def get_torrent_status(self, torrent_id: str) -> dict:
"""
POST torrent(s) id(s) to have the current status of the elements wished to be uploaded on the seedbox
on https://www.mega-debrid.eu/index.php?ajax=statusTorrent
"""
params = {"ajax": "statusTorrent"}
data = {"torrentId[]": torrent_id}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")
async def upload_magnet(self, magnet: str) -> dict:
"""
POST a magnet link on the seedbox to be processed
on https://www.mega-debrid.eu/index.php?ajax=uploadMagnet
"""
params = {"ajax": "uploadMagnet"}
data = {
"magnet": magnet,
"splitSizeFile": "0",
}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")
async def upload_torrent(self, torrent: str, split_size_file: int = 0) -> dict:
"""
POST torrent file on the seedbox to be processed
on https://www.mega-debrid.eu/index.php?ajax=uploadTorrent
"""
params = {"ajax": "uploadTorrent"}
data = {
"splitSizeFile": split_size_file,
"torrent": open(torrent, "rb"),
}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")
async def remove_torrent(self, torrent_id: str) -> dict:
"""
POST torrent id to remove torrent from the list of the seedbox
on https://www.mega-debrid.eu/index.php?ajax=removeTorrent
"""
params = {"ajax": "removeTorrent"}
data = {"id": torrent_id}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")
async def debrid_link(self, link: str, password: str = "") -> dict:
"""
POST a link to be debrided
on https://www.mega-debrid.eu/index.php?ajax=statusTorrent
Content-Type: 'application/x-www-form-urlencoded'
"""
params = {
"ajax": "xhr_debrid",
"onlyLinks": "false",
}
data = {
"link": link,
"i": "0",
"password": password,
}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
html_body = await response.text()
soup = BeautifulSoup(html_body, "html.parser")
debrid_code = soup.find(id="debrid_0")["data-code"]
params = {
"ajax": "debrid",
"json": "1",
}
data = {"code": debrid_code}
async with self.session.post(
self.base_url, params=params, data=data
) as response:
return await response.json(content_type="text/html")