-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
124 lines (102 loc) · 5.41 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import requests
import tempfile
import zipfile
import shutil
from telegram import Update, InputFile
from telegram.ext import Application, MessageHandler, CommandHandler, ContextTypes, filters
from lxml import etree
TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
ALLOWED_FILE_SIZE = 32 * 1024 * 1024 # 32 MB in bytes
MAX_FILES_IN_ARCHIVE = 1 # Maximum number of files in archive
def get_file_path(file_id):
try:
response = requests.get(f"https://api.telegram.org/bot{TOKEN}/getFile?file_id={file_id}")
response.raise_for_status()
result = response.json()
return f"https://api.telegram.org/file/bot{TOKEN}/{result['result']['file_path']}"
except requests.RequestException as e:
print(f"Error fetching file path: {e}")
return None
async def send_reply(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
await update.message.reply_text(text)
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
document = update.message.document
file_id = document.file_id
file_size = document.file_size
if file_size > ALLOWED_FILE_SIZE:
await send_reply(update, context, 'The file exceeds the allowed size of 32 MB.')
return
file_url = get_file_path(file_id)
if not file_url:
await send_reply(update, context, 'Failed to retrieve the file. Please try again.')
return
response = requests.get(file_url, stream=True)
response.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False) as tmp:
for chunk in response.iter_content(chunk_size=8192):
tmp.write(chunk)
tmp_path = tmp.name
with tempfile.TemporaryDirectory() as extract_path:
try:
with zipfile.ZipFile(tmp_path, 'r') as zip_ref:
if len(zip_ref.infolist()) > MAX_FILES_IN_ARCHIVE:
await send_reply(update, context, f'The archive contains more than {MAX_FILES_IN_ARCHIVE} files, which is not allowed.')
os.remove(tmp_path)
return
total_uncompressed_size = sum(zinfo.file_size for zinfo in zip_ref.infolist())
if total_uncompressed_size > ALLOWED_FILE_SIZE:
await send_reply(update, context, 'The total uncompressed size of the archive exceeds the allowed limit of 100 MB.')
os.remove(tmp_path)
return
zip_ref.extractall(extract_path)
except zipfile.BadZipFile:
await send_reply(update, context, 'The file is not a valid zip archive.')
os.remove(tmp_path)
return
fb2_found = False
for root, _, files in os.walk(extract_path):
for file in files:
if file.endswith('.fb2'):
fb2_path = os.path.join(root, file)
if os.path.getsize(fb2_path) <= ALLOWED_FILE_SIZE:
with open(fb2_path, 'rb') as fb2_file:
try:
tree = etree.parse(fb2_file)
ns = {'fb': 'http://www.gribuser.ru/xml/fictionbook/2.0'}
book_title = tree.findtext('.//fb:book-title', namespaces=ns)
if book_title:
caption = f'Your .fb2 file "{book_title}" has been successfully extracted from the archive! 📚 Enjoy reading!'
else:
caption = 'Your .fb2 file has been successfully extracted from the archive! 📚 Enjoy reading!'
except Exception:
caption = 'Your .fb2 file has been successfully extracted from the archive! 📚 Enjoy reading!'
fb2_file.seek(0)
await update.message.reply_document(
document=InputFile(fb2_file),
caption=caption
)
fb2_found = True
else:
await send_reply(update, context, 'The extracted .fb2 file exceeds the allowed size of 32 MB.')
break
if fb2_found:
break
if not fb2_found:
await send_reply(update, context, 'No valid .fb2 file found in the archive.')
os.remove(tmp_path)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await send_reply(update, context, 'Welcome to the Unzip Bot! 🤖\nI\'m here to help you extract .fb2 files from zip archives. Just send me a zip file, and I\'ll handle the rest. If there are multiple .fb2 files, I\'ll extract the first one I find.')
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await send_reply(update, context, 'Just send me a zip file, and I\'ll handle the rest.')
def main() -> None:
application = Application.builder().token(TOKEN).build()
document_handler = MessageHandler(filters.Document.ALL, handle_document)
start_handler = CommandHandler('start', start)
text_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)
application.add_handler(document_handler)
application.add_handler(start_handler)
application.add_handler(text_handler)
application.run_polling()
if __name__ == "__main__":
main()