diff --git a/bot/__init__.py b/bot/__init__.py index aee06bae824..a8d09c62fd2 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -74,6 +74,7 @@ queue_dict_lock = Lock() qb_listener_lock = Lock() cpu_eater_lock = Lock() +subprocess_lock = Lock() status_dict = {} task_dict = {} rss_dict = {} diff --git a/bot/__main__.py b/bot/__main__.py index 0d819263a80..9791aff193d 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -19,6 +19,7 @@ from bot import ( bot, + user, botStartTime, LOGGER, Interval, @@ -120,6 +121,8 @@ async def restart(_, message): proc1 = await create_subprocess_exec( "pkill", "-9", "-f", "gunicorn|aria2c|qbittorrent-nox|ffmpeg|rclone" ) + if user: + await user.stop() proc2 = await create_subprocess_exec("python3", "update.py") await gather(proc1.wait(), proc2.wait()) async with aiopen(".restartmsg", "w") as f: @@ -228,7 +231,10 @@ async def send_incompelete_task_message(cid, msg): async def main(): await gather( - start_cleanup(), torrent_search.initiate_search_tools(), restart_notification(), initiate_help_messages() + start_cleanup(), + torrent_search.initiate_search_tools(), + restart_notification(), + initiate_help_messages(), ) await sync_to_async(start_aria2_listener, wait=False) diff --git a/bot/helper/common.py b/bot/helper/common.py index 235f13fb3d5..7660922fbf8 100644 --- a/bot/helper/common.py +++ b/bot/helper/common.py @@ -16,6 +16,7 @@ task_dict, GLOBAL_EXTENSION_FILTER, cpu_eater_lock, + subprocess_lock, ) from bot.helper.telegram_helper.bot_commands import BotCommands from bot.helper.ext_utils.bot_utils import new_task, sync_to_async @@ -428,13 +429,10 @@ async def proceedExtract(self, dl_path, size, gid): ] if not pswd: del cmd[2] - if ( - self.suproc == "cancelled" - or self.suproc is not None - and self.suproc.returncode == -9 - ): - return False - self.suproc = await create_subprocess_exec(*cmd) + async with subprocess_lock: + if self.suproc == "cancelled": + return False + self.suproc = await create_subprocess_exec(*cmd) _, stderr = await self.suproc.communicate() code = self.suproc.returncode if code == -9: @@ -473,9 +471,10 @@ async def proceedExtract(self, dl_path, size, gid): ] if not pswd: del cmd[2] - if self.suproc == "cancelled": - return False - self.suproc = await create_subprocess_exec(*cmd) + async with subprocess_lock: + if self.suproc == "cancelled": + return False + self.suproc = await create_subprocess_exec(*cmd) _, stderr = await self.suproc.communicate() code = self.suproc.returncode if code == -9: @@ -538,9 +537,10 @@ async def proceedCompress(self, dl_path, size, gid): if not pswd: del cmd[3] LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}") - if self.suproc == "cancelled": - return False - self.suproc = await create_subprocess_exec(*cmd) + async with subprocess_lock: + if self.suproc == "cancelled": + return False + self.suproc = await create_subprocess_exec(*cmd) _, stderr = await self.suproc.communicate() code = self.suproc.returncode if code == -9: @@ -567,7 +567,7 @@ async def proceedSplit(self, up_dir, m_size, o_files, size, gid): task_dict[self.mid] = SplitStatus(self, size, gid) LOGGER.info(f"Splitting: {self.name}") res = await split_file( - f_path, f_size, file_, dirpath, self.splitSize, self + f_path, f_size, dirpath, self.splitSize, self ) if not res: return False @@ -612,9 +612,7 @@ async def generateSampleVideo(self, dl_path, size, gid): ) return res else: - for dirpath, _, files in await sync_to_async( - walk, dl_path, topdown=False - ): + for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): for file_ in files: f_path = ospath.join(dirpath, file_) if (await get_document_type(f_path))[0]: diff --git a/bot/helper/ext_utils/media_utils.py b/bot/helper/ext_utils/media_utils.py index bc6d8a67558..18d0892d1a9 100644 --- a/bot/helper/ext_utils/media_utils.py +++ b/bot/helper/ext_utils/media_utils.py @@ -1,5 +1,5 @@ -from os import path as ospath -from aiofiles.os import remove as aioremove, path as aiopath, mkdir +from os import path as ospath, cpu_count +from aiofiles.os import remove as aioremove, path as aiopath, makedirs from time import time from re import search as re_search from asyncio import create_subprocess_exec, gather @@ -7,7 +7,7 @@ from PIL import Image from aioshutil import move -from bot import LOGGER +from bot import LOGGER, subprocess_lock from bot.helper.ext_utils.bot_utils import cmd_exec from bot.helper.ext_utils.bot_utils import sync_to_async from bot.helper.ext_utils.files_utils import ARCH_EXT, get_mime_type @@ -30,8 +30,7 @@ async def createThumb(msg, _id=""): if not _id: _id = msg.id path = "Thumbnails/" - if not await aiopath.isdir(path): - await mkdir(path) + await makedirs(path, exist_ok=True) photo_dir = await msg.download() des_dir = f"{path}{_id}.jpg" await sync_to_async(Image.open(photo_dir).convert("RGB").save, des_dir, "JPEG") @@ -154,21 +153,20 @@ async def take_ss(video_file, ss_nb) -> list: duration = (await get_media_info(video_file))[0] if duration != 0: dirpath, name = video_file.rsplit("/", 1) - name = name.rsplit(".", 1)[0] + name, _ = ospath.splitext(name) dirpath = f"{dirpath}/screenshots/" - if not await aiopath.exists(dirpath): - await mkdir(dirpath) + await makedirs(dirpath, exist_ok=True) interval = duration // ss_nb + 1 cap_time = interval outputs = [] - cmd = '' + cmd = "" for i in range(ss_nb): output = f"{dirpath}SS.{name}_{i:02}.png" outputs.append(output) cmd += f'ffmpeg -hide_banner -loglevel error -ss {cap_time} -i "{video_file}" -q:v 1 -frames:v 1 "{output}"' cap_time += interval if i + 1 != ss_nb: - cmd += ' && ' + cmd += " && " _, err, code = await cmd_exec(cmd, True) if code != 0: LOGGER.error( @@ -183,8 +181,7 @@ async def take_ss(video_file, ss_nb) -> list: async def get_audio_thumb(audio_file): des_dir = "Thumbnails/" - if not await aiopath.exists(des_dir): - await mkdir(des_dir) + await makedirs(des_dir, exist_ok=True) des_dir = f"Thumbnails/{time()}.jpg" cmd = [ "ffmpeg", @@ -209,8 +206,7 @@ async def get_audio_thumb(audio_file): async def create_thumbnail(video_file, duration): des_dir = "Thumbnails" - if not await aiopath.exists(des_dir): - await mkdir(des_dir) + await makedirs(des_dir, exist_ok=True) des_dir = ospath.join(des_dir, f"{time()}.jpg") if duration is None: duration = (await get_media_info(video_file))[0] @@ -244,7 +240,6 @@ async def create_thumbnail(video_file, duration): async def split_file( path, size, - file_, dirpath, split_size, listener, @@ -253,16 +248,9 @@ async def split_file( inLoop=False, multi_streams=True, ): - if ( - listener.suproc == "cancelled" - or listener.suproc is not None - and listener.suproc.returncode == -9 - ): - return False if listener.seed and not listener.newDir: dirpath = f"{dirpath}/splited_files_mltb" - if not await aiopath.exists(dirpath): - await mkdir(dirpath) + await makedirs(dirpath, exist_ok=True) parts = -(-size // listener.splitSize) if listener.equalSplits and not inLoop: split_size = (size // parts) + (size % parts) @@ -270,11 +258,10 @@ async def split_file( if multi_streams: multi_streams = await is_multi_streams(path) duration = (await get_media_info(path))[0] - base_name, extension = ospath.splitext(file_) + base_name, extension = ospath.splitext(path) split_size -= 5000000 while i <= parts or start_time < duration - 4: - parted_name = f"{base_name}.part{i:03}{extension}" - out_path = ospath.join(dirpath, parted_name) + out_path = f"{base_name}.part{i:03}{extension}" cmd = [ "ffmpeg", "-hide_banner", @@ -301,13 +288,10 @@ async def split_file( if not multi_streams: del cmd[10] del cmd[10] - if ( - listener.suproc == "cancelled" - or listener.suproc is not None - and listener.suproc.returncode == -9 - ): - return False - listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) + async with subprocess_lock: + if listener.suproc == "cancelled": + return False + listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) _, stderr = await listener.suproc.communicate() code = listener.suproc.returncode if code == -9: @@ -325,7 +309,6 @@ async def split_file( return await split_file( path, size, - file_, dirpath, split_size, listener, @@ -347,7 +330,6 @@ async def split_file( return await split_file( path, size, - file_, dirpath, split_size, listener, @@ -373,16 +355,19 @@ async def split_file( start_time += lpd - 3 i += 1 else: - out_path = ospath.join(dirpath, f"{file_}.") - listener.suproc = await create_subprocess_exec( - "split", - "--numeric-suffixes=1", - "--suffix-length=3", - f"--bytes={split_size}", - path, - out_path, - stderr=PIPE, - ) + out_path = f"{path}." + async with subprocess_lock: + if listener.suproc == "cancelled": + return False + listener.suproc = await create_subprocess_exec( + "split", + "--numeric-suffixes=1", + "--suffix-length=3", + f"--bytes={split_size}", + path, + out_path, + stderr=PIPE, + ) _, stderr = await listener.suproc.communicate() code = listener.suproc.returncode if code == -9: @@ -394,13 +379,13 @@ async def split_file( async def createSampleVideo( - listener, input_file, sample_duration, part_duration, oneFile=False + listener, video_file, sample_duration, part_duration, oneFile=False ): filter_complex = "" - dir, name = input_file.rsplit("/", 1) + dir, name = video_file.rsplit("/", 1) output_file = f"{dir}/SAMPLE.{name}" segments = [(0, part_duration)] - duration = (await get_media_info(input_file))[0] + duration = (await get_media_info(video_file))[0] remaining_duration = duration - (part_duration * 2) parts = (sample_duration - (part_duration * 2)) // part_duration time_interval = remaining_duration // parts @@ -426,7 +411,7 @@ async def createSampleVideo( cmd = [ "ffmpeg", "-i", - input_file, + video_file, "-filter_complex", filter_complex, "-map", @@ -437,14 +422,12 @@ async def createSampleVideo( "libx264", "-c:a", "aac", + "-threads", + f"{cpu_count()//2}", output_file, ] - if ( - listener.suproc == "cancelled" - or listener.suproc is not None - and listener.suproc.returncode == -9 - ): + if listener.suproc == "cancelled": return False listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) _, stderr = await listener.suproc.communicate() @@ -454,15 +437,15 @@ async def createSampleVideo( elif code != 0: stderr = stderr.decode().strip() LOGGER.error( - f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {input_file}" + f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {video_file}" ) - return input_file + return video_file else: if oneFile: - newDir = input_file.rsplit(".", 1)[0] - await mkdir(newDir) + newDir, _ = ospath.splitext(video_file) + await makedirs(newDir, exist_ok=True) await gather( - move(input_file, f"{newDir}/{name}"), + move(video_file, f"{newDir}/{name}"), move(output_file, f"{newDir}/SAMPLE.{name}"), ) return newDir diff --git a/bot/helper/mirror_utils/rclone_utils/transfer.py b/bot/helper/mirror_utils/rclone_utils/transfer.py index 496e91b3bee..22d312f5786 100644 --- a/bot/helper/mirror_utils/rclone_utils/transfer.py +++ b/bot/helper/mirror_utils/rclone_utils/transfer.py @@ -2,7 +2,7 @@ from asyncio.subprocess import PIPE from re import findall as re_findall from json import loads -from aiofiles.os import path as aiopath, mkdir, listdir +from aiofiles.os import path as aiopath, makedirs, listdir from aiofiles import open as aiopen from configparser import ConfigParser from random import randrange @@ -95,10 +95,9 @@ def _switchServiceAccount(self): async def _create_rc_sa(self, remote, remote_opts): sa_conf_dir = "rclone_sa" sa_conf_file = f"{sa_conf_dir}/{remote}.conf" - if not await aiopath.isdir(sa_conf_dir): - await mkdir(sa_conf_dir) - elif await aiopath.isfile(sa_conf_file): + if await aiopath.isfile(sa_conf_file): return sa_conf_file + await makedirs(sa_conf_dir, exist_ok=True) if gd_id := remote_opts.get("team_drive"): option = "team_drive" diff --git a/bot/helper/mirror_utils/status_utils/extract_status.py b/bot/helper/mirror_utils/status_utils/extract_status.py index 0feebb2ed91..46bb21254c9 100644 --- a/bot/helper/mirror_utils/status_utils/extract_status.py +++ b/bot/helper/mirror_utils/status_utils/extract_status.py @@ -1,6 +1,6 @@ from time import time -from bot import LOGGER +from bot import LOGGER, subprocess_lock from bot.helper.ext_utils.status_utils import ( get_readable_file_size, MirrorStatus, @@ -65,8 +65,12 @@ def task(self): async def cancel_task(self): LOGGER.info(f"Cancelling Extract: {self.listener.name}") - if self.listener.suproc is not None: - self.listener.suproc.kill() - else: - self.listener.suproc = "cancelled" + async with subprocess_lock: + if ( + self.listener.suproc is not None + and self.listener.suproc.returncode is None + ): + self.listener.suproc.kill() + else: + self.listener.suproc = "cancelled" await self.listener.onUploadError("extracting stopped by user!") diff --git a/bot/helper/mirror_utils/status_utils/sample_video_status.py b/bot/helper/mirror_utils/status_utils/sample_video_status.py index dc2db2c098f..22c4b3563f4 100644 --- a/bot/helper/mirror_utils/status_utils/sample_video_status.py +++ b/bot/helper/mirror_utils/status_utils/sample_video_status.py @@ -37,7 +37,10 @@ def task(self): async def cancel_task(self): LOGGER.info(f"Cancelling Sample Video: {self.listener.name}") - if self.listener.suproc is not None: + if ( + self.listener.suproc is not None + and self.listener.suproc.returncode is None + ): self.listener.suproc.kill() else: self.listener.suproc = "cancelled" diff --git a/bot/helper/mirror_utils/status_utils/split_status.py b/bot/helper/mirror_utils/status_utils/split_status.py index 7ab9aa5adab..3b6f4d30a0e 100644 --- a/bot/helper/mirror_utils/status_utils/split_status.py +++ b/bot/helper/mirror_utils/status_utils/split_status.py @@ -1,4 +1,4 @@ -from bot import LOGGER +from bot import LOGGER, subprocess_lock from bot.helper.ext_utils.status_utils import get_readable_file_size, MirrorStatus @@ -37,8 +37,12 @@ def task(self): async def cancel_task(self): LOGGER.info(f"Cancelling Split: {self.listener.name}") - if self.listener.suproc is not None: - self.listener.suproc.kill() - else: - self.listener.suproc = "cancelled" + async with subprocess_lock: + if ( + self.listener.suproc is not None + and self.listener.suproc.returncode is None + ): + self.listener.suproc.kill() + else: + self.listener.suproc = "cancelled" await self.listener.onUploadError("splitting stopped by user!") diff --git a/bot/helper/mirror_utils/status_utils/zip_status.py b/bot/helper/mirror_utils/status_utils/zip_status.py index d510dc53221..fad6f56f13a 100644 --- a/bot/helper/mirror_utils/status_utils/zip_status.py +++ b/bot/helper/mirror_utils/status_utils/zip_status.py @@ -1,6 +1,6 @@ from time import time -from bot import LOGGER +from bot import LOGGER, subprocess_lock from bot.helper.ext_utils.status_utils import ( get_readable_file_size, MirrorStatus, @@ -65,8 +65,12 @@ def task(self): async def cancel_task(self): LOGGER.info(f"Cancelling Archive: {self.listener.name}") - if self.listener.suproc is not None: - self.listener.suproc.kill() - else: - self.listener.suproc = "cancelled" + async with subprocess_lock: + if ( + self.listener.suproc is not None + and self.listener.suproc.returncode is None + ): + self.listener.suproc.kill() + else: + self.listener.suproc = "cancelled" await self.listener.onUploadError("archiving stopped by user!") diff --git a/bot/modules/users_settings.py b/bot/modules/users_settings.py index d58d74c89fd..67fca72b7b7 100644 --- a/bot/modules/users_settings.py +++ b/bot/modules/users_settings.py @@ -1,6 +1,6 @@ from pyrogram.handlers import MessageHandler, CallbackQueryHandler from pyrogram.filters import command, regex, create -from aiofiles.os import remove as aioremove, path as aiopath, mkdir +from aiofiles.os import remove as aioremove, path as aiopath, makedirs from os import getcwd from time import time from functools import partial @@ -204,8 +204,7 @@ async def add_rclone(_, message, pre_event): user_id = message.from_user.id handler_dict[user_id] = False path = f"{getcwd()}/rclone/" - if not await aiopath.isdir(path): - await mkdir(path) + await makedirs(path, exist_ok=True) des_dir = f"{path}{user_id}.conf" await message.download(file_name=des_dir) update_user_ldata(user_id, "rclone_config", f"rclone/{user_id}.conf") @@ -219,8 +218,7 @@ async def add_token_pickle(_, message, pre_event): user_id = message.from_user.id handler_dict[user_id] = False path = f"{getcwd()}/tokens/" - if not await aiopath.isdir(path): - await mkdir(path) + await makedirs(path, exist_ok=True) des_dir = f"{path}{user_id}.pickle" await message.download(file_name=des_dir) update_user_ldata(user_id, "token_pickle", f"tokens/{user_id}.pickle")