From 4749790b99438761f16b4e1498a6631faaa81cd6 Mon Sep 17 00:00:00 2001 From: anasty17 Date: Wed, 1 Jan 2025 03:32:24 +0200 Subject: [PATCH] Add ffmpeg progress - Get progress info from 7z stdout for extarct and zip - Change ffmpeg_cmds input. Check readme! - Add all audios to media while convert -- ToDo -- (Next Commit) - Change how user add yt-dlp options and make it dict - Refactor usettings, user can add part option instead of edit the whole option like extension filter or ffmpegcmds or yt-dlp options - Add auto sessions restart interval - Add upload paths in config.py also as dict - Add merge video/audio/subtitles Signed-off-by: anasty17 --- README.md | 12 +- bot/__init__.py | 1 - bot/core/config_manager.py | 2 +- bot/helper/common.py | 127 ++++++++++++------ bot/helper/ext_utils/bot_utils.py | 19 +-- bot/helper/ext_utils/db_handler.py | 4 +- bot/helper/ext_utils/files_utils.py | 9 +- bot/helper/ext_utils/help_messages.py | 1 + bot/helper/ext_utils/media_utils.py | 112 ++++++++------- bot/helper/ext_utils/status_utils.py | 28 ++-- bot/helper/listeners/task_listener.py | 29 +++- .../status_utils/ffmpeg_status.py | 61 ++++++++- .../status_utils/sevenz_status.py | 87 ++++++++---- .../status_utils/yt_dlp_status.py | 10 +- .../mirror_leech_utils/telegram_uploader.py | 3 + bot/modules/bot_settings.py | 2 + bot/modules/mirror_leech.py | 5 +- bot/modules/restart.py | 2 +- bot/modules/users_settings.py | 11 +- bot/modules/ytdlp.py | 5 +- config_sample.py | 2 +- 21 files changed, 370 insertions(+), 162 deletions(-) diff --git a/README.md b/README.md index 5916ce78d8c..1955fcc7581 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,7 @@ programming in Python. - Ability to save upload paths - Name Substitution to rename the files before upload - User can select whether he want to use his rclone.conf/token.pickle without adding mpt: or mrcc: before path/gd-id +- FFmpeg commands to execute it after download (task option) - Supported Direct links Generators: > mediafire (file/folders), hxfile.co (need cookies txt with name) [hxfile.txt], streamtape.com, streamsb.net, streamhub.ink, @@ -304,10 +305,12 @@ Fill up rest of the fields. Meaning of each field is discussed below. - `USE_SERVICE_ACCOUNTS`: Whether to use Service Accounts or not, with google-api-python-client. For this to work see [Using Service Accounts](https://github.com/anasty17/mirror-leech-telegram-bot#generate-service-accounts-what-is-service-account) section below. Default is `False`. `Bool` -- `FFMPEG_CMDS`: list of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. `List` - - Examples: ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb", "-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"] +- `FFMPEG_CMDS`: Dict of list values of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. `Dict` + - Examples: {"subtitle": ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb"], "convert": ["-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"]} **Notes**: + - Don't add ffmpeg at the beginning! - Add `-del` to the list which you want from the bot to delete the original files after command run complete! + - To execute one of those lists in bot for example, you must use -ff subtitle (list key) or -ff convert (list key) - Seed will get disbaled while using this option **Example**: - Here I will explain how to use mltb.* which is reference to files you want to work on. @@ -470,7 +473,7 @@ sudo docker build . -t mltb - Run the image: ``` -sudo docker run -p 80:80 -p 8080:8080 mltb +sudo docker run -p 80:80 -p 8080:8080 -p 8070:8070 -p 8090:8090 mltb ``` - To stop the running image: @@ -532,6 +535,8 @@ sudo docker compose logs --follow ------ + + **IMPORTANT NOTES**: 1. Set `BASE_URL_PORT` and `RCLONE_SERVE_PORT` variables to any port you want to use. Default is `80` and `8080` @@ -542,7 +547,6 @@ sudo docker compose logs --follow ------ - diff --git a/bot/__init__.py b/bot/__init__.py index 895cc379b67..1c8054e74a6 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -65,7 +65,6 @@ nzb_listener_lock = Lock() jd_lock = Lock() cpu_eater_lock = Lock() -subprocess_lock = Lock() same_directory_lock = Lock() extension_filter = ["aria2", "!qB"] drives_names = [] diff --git a/bot/core/config_manager.py b/bot/core/config_manager.py index 5d37c1239fe..7a627afb449 100644 --- a/bot/core/config_manager.py +++ b/bot/core/config_manager.py @@ -13,7 +13,7 @@ class Config: DOWNLOAD_DIR = "/usr/src/app/downloads/" EQUAL_SPLITS = False EXTENSION_FILTER = "" - FFMPEG_CMDS = [] + FFMPEG_CMDS = {} FILELION_API = "" GDRIVE_ID = "" INCOMPLETE_TASK_NOTIFIER = False diff --git a/bot/helper/common.py b/bot/helper/common.py index b8e2b96a6ff..cabacaa4cf2 100644 --- a/bot/helper/common.py +++ b/bot/helper/common.py @@ -1,5 +1,5 @@ from aiofiles.os import path as aiopath, remove, makedirs -from asyncio import sleep, create_subprocess_exec, gather +from asyncio import sleep, create_subprocess_exec, gather, Lock from asyncio.subprocess import PIPE from os import walk, path as ospath from secrets import token_urlsafe @@ -15,7 +15,6 @@ task_dict, extension_filter, cpu_eater_lock, - subprocess_lock, intervals, ) from ..core.config_manager import Config @@ -73,6 +72,7 @@ def __init__(self): self.rc_flags = "" self.tag = "" self.name = "" + self.subname = "" self.new_dir = "" self.name_sub = "" self.thumbnail_layout = "" @@ -81,6 +81,7 @@ def __init__(self): self.max_split_size = 0 self.multi = 0 self.size = 0 + self.subsize = 0 self.is_leech = False self.is_qbit = False self.is_nzb = False @@ -110,9 +111,11 @@ def __init__(self): self.is_torrent = False self.as_med = False self.as_doc = False + self.is_file = False self.ffmpeg_cmds = None self.chat_thread_id = None self.subproc = None + self.subprocess_lock = Lock() self.thumb = None self.extension_filter = [] self.is_super_chat = self.message.chat.type.name in ["SUPERGROUP", "CHANNEL"] @@ -205,11 +208,14 @@ async def before_start(self): ): self.up_dest = self.user_dict["upload_paths"][self.up_dest] - self.ffmpeg_cmds = ( - self.ffmpeg_cmds - or self.user_dict.get("ffmpeg_cmds", None) - or (Config.FFMPEG_CMDS if "ffmpeg_cmds" not in self.user_dict else None) - ) + if self.ffmpeg_cmds and not isinstance(self.ffmpeg_cmds, list): + if self.user_dict.get("ffmpeg_cmds", None): + self.ffmpeg_cmds = self.user_dict["ffmpeg_cmds"].get(self.ffmpeg_cmds, None) + elif "ffmpeg_cmds" not in self.user_dict and Config.FFMPEG_CMDS: + self.ffmpeg_cmds = Config.FFMPEG_CMDS.get(self.ffmpeg_cmds, None) + else: + self.ffmpeg_cmds = None + if self.ffmpeg_cmds: self.seed = False @@ -558,17 +564,16 @@ async def decompress_zst(self, dl_path, is_dir=False): cmd = ["unzstd", f_path, "-o", out_path] if self.is_cancelled: return "" - async with subprocess_lock: + async with self.subprocess_lock: self.subproc = await create_subprocess_exec( *cmd, stderr=PIPE ) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( @@ -576,26 +581,27 @@ async def decompress_zst(self, dl_path, is_dir=False): ) elif not self.seed: await remove(f_path) + self.subproc = None return elif dl_path.endswith(".zst"): out_path = get_base_name(dl_path) cmd = ["unzstd", dl_path, "-o", out_path] if self.is_cancelled: return "" - async with subprocess_lock: + async with self.subprocess_lock: self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error(f"{stderr}. Unable to extract zst file!. Path: {dl_path}") elif not self.seed: await remove(dl_path) + self.subproc = None return out_path return dl_path @@ -605,7 +611,7 @@ async def proceed_extract(self, dl_path, gid): LOGGER.info(f"Extracting: {self.name}") async with task_dict_lock: task_dict[self.mid] = SevenZStatus(self, gid, "Extract") - if await aiopath.isdir(dl_path): + if not self.is_file: if self.seed: self.new_dir = f"{self.dir}10000" up_path = f"{self.new_dir}/{self.name}" @@ -636,22 +642,25 @@ async def proceed_extract(self, dl_path, gid): f"-o{t_path}", "-aot", "-xr!@PaxHeader", + "-bsp1", + "-bse1", + "-bb3", ] if not pswd: del cmd[2] if self.is_cancelled: return "" - async with subprocess_lock: + self.subname = file_ + async with self.subprocess_lock: self.subproc = await create_subprocess_exec( - *cmd, stderr=PIPE + *cmd, stdout=PIPE, stderr=PIPE ) - _, stderr = await self.subproc.communicate() + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code != 0: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( @@ -669,6 +678,7 @@ async def proceed_extract(self, dl_path, gid): await remove(del_path) except: self.is_cancelled = True + self.subproc = None return up_path else: dl_path = await self.decompress_zst(dl_path) @@ -684,17 +694,21 @@ async def proceed_extract(self, dl_path, gid): f"-o{up_path}", "-aot", "-xr!@PaxHeader", + "-bsp1", + "-bse1", + "-bb3", ] if not pswd: del cmd[2] if self.is_cancelled: return "" - async with subprocess_lock: - self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + async with self.subprocess_lock: + self.subproc = await create_subprocess_exec( + *cmd, stdout=PIPE, stderr=PIPE + ) + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code == -9: self.is_cancelled = True return "" @@ -705,22 +719,25 @@ async def proceed_extract(self, dl_path, gid): await remove(dl_path) except: self.is_cancelled = True + self.subproc = None return up_path else: try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( f"{stderr}. Unable to extract archive! Uploading anyway. Path: {dl_path}" ) self.new_dir = "" + self.subproc = None return dl_path except NotSupportedExtractionArchive: LOGGER.info( f"Not any valid archive, uploading file as it is. Path: {dl_path}" ) self.new_dir = "" + self.subproc = None return dl_path async def proceed_compress(self, dl_path, gid, o_files, ft_delete): @@ -744,12 +761,15 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): "7z", f"-v{split_size}b", "a", - "-mx=0", + "-mx=9", f"-p{pswd}", up_path, dl_path, + "-bsp1", + "-bse1", + "-bb3", ] - if await aiopath.isdir(dl_path): + if not self.is_file: cmd.extend(f"-xr!*.{ext}" for ext in self.extension_filter) if o_files: for f in o_files: @@ -769,12 +789,11 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}") if self.is_cancelled: return "" - async with subprocess_lock: - self.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await self.subproc.communicate() + async with self.subprocess_lock: + self.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await self.subproc.wait() if self.is_cancelled: return "" - code = self.subproc.returncode if code == -9: self.is_cancelled = True return "" @@ -788,16 +807,18 @@ async def proceed_compress(self, dl_path, gid, o_files, ft_delete): except: pass ft_delete.clear() + self.subproc = None return up_path else: await clean_target(self.new_dir) if not delete: self.new_dir = "" try: - stderr = stderr.decode().strip() + stderr = (await self.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}") + self.subproc = None return dl_path async def proceed_split(self, up_dir, m_size, o_files, gid): @@ -814,9 +835,15 @@ async def proceed_split(self, up_dir, m_size, o_files, gid): async with task_dict_lock: task_dict[self.mid] = FFmpegStatus(self, gid, "Split") LOGGER.info(f"Splitting: {self.name}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = f_size + self.subname = file_ res = await split_file( f_path, f_size, dirpath, file_, self.split_size, self ) + self.subproc = None if self.is_cancelled: return if not res: @@ -854,14 +881,16 @@ async def generate_sample_video(self, dl_path, gid, unwanted_files, ft_delete): task_dict[self.mid] = FFmpegStatus(self, gid, "Sample Video") checked = False - if await aiopath.isfile(dl_path): + if self.is_file: if (await get_document_type(dl_path))[0]: checked = True async with cpu_eater_lock: LOGGER.info(f"Creating Sample video: {self.name}") + self.subsize = self.size res = await create_sample_video( self, dl_path, sample_duration, part_duration ) + self.subproc = None if res: new_folder = ospath.splitext(dl_path)[0] name = dl_path.rsplit("/", 1)[1] @@ -897,9 +926,12 @@ async def generate_sample_video(self, dl_path, gid, unwanted_files, ft_delete): if checked: cpu_eater_lock.release() return "" + self.subsize = await aiopath.getsize(f_path) + self.subname = file_ res = await create_sample_video( self, f_path, sample_duration, part_duration ) + self.subproc = None if res: ft_delete.append(res) if checked: @@ -969,7 +1001,13 @@ async def proceed_convert(m_path): LOGGER.info(f"Converting: {self.name}") else: LOGGER.info(f"Converting: {m_path}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = await aiopath.getsize(m_path) + self.subname = m_path.rsplit("/", 1)[-1] res = await convert_video(self, m_path, vext) + self.subproc = None return "" if self.is_cancelled else res elif ( is_audio @@ -992,12 +1030,18 @@ async def proceed_convert(m_path): LOGGER.info(f"Converting: {self.name}") else: LOGGER.info(f"Converting: {m_path}") + if self.is_file: + self.subsize = self.size + else: + self.subsize = await aiopath.getsize(m_path) + self.subname = m_path.rsplit("/", 1)[-1] res = await convert_audio(self, m_path, aext) + self.subproc = None return "" if self.is_cancelled else res else: return "" - if await aiopath.isfile(dl_path): + if self.is_file: output_file = await proceed_convert(dl_path) if checked: cpu_eater_lock.release() @@ -1040,7 +1084,7 @@ async def proceed_convert(m_path): async def generate_screenshots(self, dl_path): ss_nb = int(self.screen_shots) if isinstance(self.screen_shots, str) else 10 - if await aiopath.isfile(dl_path): + if self.is_file: if (await get_document_type(dl_path))[0]: LOGGER.info(f"Creating Screenshot for: {dl_path}") res = await take_ss(dl_path, ss_nb) @@ -1072,7 +1116,7 @@ async def generate_screenshots(self, dl_path): return dl_path async def substitute(self, dl_path): - if await aiopath.isfile(dl_path): + if self.is_file: up_dir, name = dl_path.rsplit("/", 1) for substitution in self.name_sub: sen = False @@ -1144,6 +1188,8 @@ async def proceed_ffmpeg(self, dl_path, gid): "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", ] + ffmpeg_cmd if "-del" in cmd: cmd.remove("-del") @@ -1160,7 +1206,7 @@ async def proceed_ffmpeg(self, dl_path, gid): ext = "all" else: ext = ospath.splitext(input_file)[-1] - if await aiopath.isfile(dl_path): + if self.is_file: is_video, is_audio, _ = await get_document_type(dl_path) if not is_video and not is_audio: break @@ -1183,7 +1229,9 @@ async def proceed_ffmpeg(self, dl_path, gid): await cpu_eater_lock.acquire() LOGGER.info(f"Running ffmpeg cmd for: {file_path}") cmd[index + 1] = file_path + self.subsize = self.size res = await run_ffmpeg_cmd(self, cmd, file_path) + self.subproc = None if res and delete_files: await remove(file_path) directory = ospath.dirname(res) @@ -1217,7 +1265,10 @@ async def proceed_ffmpeg(self, dl_path, gid): task_dict[self.mid] = FFmpegStatus(self, gid, "FFmpeg") await cpu_eater_lock.acquire() LOGGER.info(f"Running ffmpeg cmd for: {f_path}") + self.subsize = await aiopath.getsize(file_path) + self.subname = file_ res = await run_ffmpeg_cmd(self, cmd, f_path) + self.subproc = None if res and delete_files: await remove(f_path) directory = ospath.dirname(res) diff --git a/bot/helper/ext_utils/bot_utils.py b/bot/helper/ext_utils/bot_utils.py index cefdae945c4..d3913a9fe49 100644 --- a/bot/helper/ext_utils/bot_utils.py +++ b/bot/helper/ext_utils/bot_utils.py @@ -117,7 +117,13 @@ def process_argument_with_values(start_index): values = [] for j in range(start_index + 1, total): if items[j] in arg_base: - break + check = " ".join(values).strip() + if check.startswith("[") and check.endswith("]"): + break + elif check.startswith("["): + pass + else: + break values.append(items[j]) return values @@ -135,17 +141,6 @@ def process_argument_with_values(start_index): in ["-s", "-j", "-f", "-fd", "-fu", "-sync", "-ml", "-doc", "-med"] ): arg_base[part] = True - elif part == "-ff": - i += 1 - if i < total: - values = [] - while i < total: - values.append(items[i]) - if items[i].endswith("]"): - break - else: - i += 1 - arg_base[part] = " ".join(values) else: sub_list = process_argument_with_values(i) if sub_list: diff --git a/bot/helper/ext_utils/db_handler.py b/bot/helper/ext_utils/db_handler.py index c40d1cc2282..4c974afc1c8 100644 --- a/bot/helper/ext_utils/db_handler.py +++ b/bot/helper/ext_utils/db_handler.py @@ -74,8 +74,8 @@ async def update_qbittorrent(self, key, value): async def save_qbit_settings(self): if self._return: return - await self.db.settings.qbittorrent.replace_one( - {"_id": TgClient.ID}, qbit_options, upsert=True + await self.db.settings.qbittorrent.update_one( + {"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True ) async def update_private_file(self, path): diff --git a/bot/helper/ext_utils/files_utils.py b/bot/helper/ext_utils/files_utils.py index 272b485e213..d7fc3324fa0 100644 --- a/bot/helper/ext_utils/files_utils.py +++ b/bot/helper/ext_utils/files_utils.py @@ -107,7 +107,14 @@ def exit_clean_up(signal, frame): try: LOGGER.info("Please wait, while we clean up and stop the running downloads") clean_all() - srun(["pkill", "-9", "-f", "gunicorn|aria2c|qbittorrent-nox|ffmpeg|java"]) + srun( + [ + "pkill", + "-9", + "-f", + "gunicorn|aria2c|qbittorrent-nox|ffmpeg|java|sabnzbdplus|7z", + ] + ) exit(0) except KeyboardInterrupt: LOGGER.warning("Force Exiting before the cleanup finishes!") diff --git a/bot/helper/ext_utils/help_messages.py b/bot/helper/ext_utils/help_messages.py index fc726790fd9..56e97d25014 100644 --- a/bot/helper/ext_utils/help_messages.py +++ b/bot/helper/ext_utils/help_messages.py @@ -241,6 +241,7 @@ Notes: 1. Add -del to the list(s) which you want from the bot to delete the original files after command run complete! 2. Seed will get disbaled while using this option +3. To execute one of pre-added lists in bot like: ({"subtitle": ["-i mltb.mkv -c copy -c:s srt mltb.mkv"]}), you must use -ff subtitle (list key) Examples: ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb", "-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"] Here I will explain how to use mltb.* which is reference to files you want to work on. 1. First cmd: the input is mltb.mkv so this cmd will work only on mkv videos and the output is mltb.mkv also so all outputs is mkv. -del will delete the original media after complete run of the cmd. diff --git a/bot/helper/ext_utils/media_utils.py b/bot/helper/ext_utils/media_utils.py index 2803cf01bf6..7f9b0f31088 100644 --- a/bot/helper/ext_utils/media_utils.py +++ b/bot/helper/ext_utils/media_utils.py @@ -7,7 +7,7 @@ from time import time from aioshutil import rmtree -from ... import LOGGER, subprocess_lock +from ... import LOGGER from ...core.config_manager import Config from .bot_utils import cmd_exec, sync_to_async from .files_utils import ARCH_EXT, get_mime_type @@ -22,8 +22,14 @@ async def convert_video(listener, video_file, ext, retry=False): "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, + "-map", + "0:v", + "-map", + "0:a", "-c:v", "libx264", "-c:a", @@ -33,17 +39,19 @@ async def convert_video(listener, video_file, ext, retry=False): output, ] if ext == "mp4": - cmd[10:10] = ["-c:s", "mov_text"] + cmd[16:16] = ["-c:s", "mov_text"] elif ext == "mkv": - cmd[10:10] = ["-c:s", "ass"] + cmd[16:16] = ["-c:s", "ass"] else: - cmd[10:10] = ["-c:s", "copy"] + cmd[16:16] = ["-c:s", "copy"] else: cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, "-map", @@ -54,12 +62,12 @@ async def convert_video(listener, video_file, ext, retry=False): ] if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output elif code == -9: @@ -72,7 +80,7 @@ async def convert_video(listener, video_file, ext, retry=False): return await convert_video(listener, video_file, ext, True) else: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( @@ -89,6 +97,8 @@ async def convert_audio(listener, audio_file, ext): "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", audio_file, "-threads", @@ -97,12 +107,12 @@ async def convert_audio(listener, audio_file, ext): ] if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output elif code == -9: @@ -110,7 +120,7 @@ async def convert_audio(listener, audio_file, ext): return False else: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( @@ -445,6 +455,8 @@ async def split_file( "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-ss", str(start_time), "-i", @@ -464,22 +476,25 @@ async def split_file( out_path, ] if not multi_streams: - del cmd[10] - del cmd[10] - if listener.is_cancelled: - return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() + del cmd[12] + del cmd[12] if listener.is_cancelled: return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec( + *cmd, stdout=PIPE, stderr=PIPE + ) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False + code = listener.subproc.returncode if code == -9: listener.is_cancelled = True return False elif code != 0: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" try: @@ -541,8 +556,9 @@ async def split_file( start_time += lpd - 3 i += 1 else: + listener.subsize = 0 out_path = f"{dirpath}/{file_}." - async with subprocess_lock: + async with listener.subprocess_lock: if listener.is_cancelled: return False listener.subproc = await create_subprocess_exec( @@ -554,16 +570,16 @@ async def split_file( out_path, stderr=PIPE, ) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == -9: listener.is_cancelled = True return False elif code != 0: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error(f"{stderr}. Split Document: {path}") @@ -603,6 +619,8 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati "-hide_banner", "-loglevel", "error", + "-progress", + "pipe:1", "-i", video_file, "-filter_complex", @@ -622,12 +640,12 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*cmd, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == -9: listener.is_cancelled = True return False @@ -635,7 +653,7 @@ async def create_sample_video(listener, video_file, sample_duration, part_durati return output_file else: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except Exception: stderr = "Unable to decode the error!" LOGGER.error( @@ -755,12 +773,12 @@ async def run_ffmpeg_cmd(listener, ffmpeg, path): ffmpeg[-1] = output if listener.is_cancelled: return False - async with subprocess_lock: - listener.subproc = await create_subprocess_exec(*ffmpeg, stderr=PIPE) - _, stderr = await listener.subproc.communicate() - if listener.is_cancelled: - return False - code = listener.subproc.returncode + async with listener.subprocess_lock: + listener.subproc = await create_subprocess_exec(*ffmpeg, stdout=PIPE, stderr=PIPE) + code = await listener.subproc.wait() + async with listener.subprocess_lock: + if listener.is_cancelled: + return False if code == 0: return output elif code == -9: @@ -768,7 +786,7 @@ async def run_ffmpeg_cmd(listener, ffmpeg, path): return False else: try: - stderr = stderr.decode().strip() + stderr = (await listener.subproc.stderr.read()).decode().strip() except: stderr = "Unable to decode the error!" LOGGER.error( diff --git a/bot/helper/ext_utils/status_utils.py b/bot/helper/ext_utils/status_utils.py index fb0e6daeebf..95ff6daa2b4 100644 --- a/bot/helper/ext_utils/status_utils.py +++ b/bot/helper/ext_utils/status_utils.py @@ -188,21 +188,31 @@ async def get_readable_message(sid, is_user, page_no=1, status="All", page_step= else: msg += f"{index + start_position}.{tstatus}: " msg += f"{escape(f'{task.name()}')}" - if tstatus not in [ - MirrorStatus.STATUS_SPLIT, - MirrorStatus.STATUS_SEED, - MirrorStatus.STATUS_SAMVID, - MirrorStatus.STATUS_CONVERT, - MirrorStatus.STATUS_FFMPEG, - MirrorStatus.STATUS_QUEUEUP, - ]: + if ( + tstatus + not in [ + MirrorStatus.STATUS_SEED, + MirrorStatus.STATUS_QUEUEUP, + MirrorStatus.STATUS_SPLIT, + ] + or (MirrorStatus.STATUS_SPLIT + and task.listener.subsize) + ): progress = ( await task.progress() if iscoroutinefunction(task.progress) else task.progress() ) + if task.listener.subname: + msg += f"\n{task.listener.subname[:35]}" msg += f"\n{get_progress_bar_string(progress)} {progress}" - msg += f"\nProcessed: {task.processed_bytes()} of {task.size()}" + if task.listener.subname: + size = ( + f"{get_readable_file_size(task.listener.subsize)} ({task.size()})" + ) + else: + size = task.size() + msg += f"\nProcessed: {task.processed_bytes()} of {size}" msg += f"\nSpeed: {task.speed()} | ETA: {task.eta()}" if hasattr(task, "seeders_num"): try: diff --git a/bot/helper/listeners/task_listener.py b/bot/helper/listeners/task_listener.py index ae142a6bef1..ecebba81e93 100644 --- a/bot/helper/listeners/task_listener.py +++ b/bot/helper/listeners/task_listener.py @@ -154,6 +154,7 @@ async def on_download_complete(self): return up_path = f"{self.dir}/{self.name}" + self.is_file = await aiopath.isfile(up_path) self.size = await get_path_size(up_path) if not Config.QUEUE_ALL: async with queue_dict_lock: @@ -161,15 +162,19 @@ async def on_download_complete(self): non_queued_dl.remove(self.mid) await start_from_queued() - if self.join and await aiopath.isdir(up_path): + if self.join and not self.is_file: await join_files(up_path) if self.extract and not self.is_nzb: up_path = await self.proceed_extract(up_path, gid) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.ffmpeg_cmds: up_path = await self.proceed_ffmpeg( @@ -178,21 +183,28 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.name_sub: up_path = await self.substitute(up_path) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) self.name = up_path.rsplit("/", 1)[1] if self.screen_shots: up_path = await self.generate_screenshots(up_path) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None if self.convert_audio or self.convert_video: up_path = await self.convert_media( @@ -204,8 +216,12 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.sample_video: up_path = await self.generate_sample_video( @@ -213,15 +229,23 @@ async def on_download_complete(self): ) if self.is_cancelled: return + self.is_file = await aiopath.isfile(up_path) up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) + self.subproc = None + self.subname = "" + self.subsize = 0 if self.compress: up_path = await self.proceed_compress( up_path, gid, unwanted_files, files_to_delete ) + self.is_file = await aiopath.isfile(up_path) if self.is_cancelled: return + self.subproc = None + self.subname = "" + self.subsize = 0 up_dir, self.name = up_path.rsplit("/", 1) self.size = await get_path_size(up_dir) @@ -230,6 +254,9 @@ async def on_download_complete(self): await self.proceed_split(up_dir, unwanted_files_size, unwanted_files, gid) if self.is_cancelled: return + self.subproc = None + self.subname = "" + self.subsize = 0 add_to_queue, event = await check_running_tasks(self, "up") await start_from_queued() diff --git a/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py b/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py index 6abf41f5379..3ee68367c2f 100644 --- a/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/ffmpeg_status.py @@ -1,14 +1,56 @@ -from .... import LOGGER, subprocess_lock -from ...ext_utils.status_utils import get_readable_file_size, MirrorStatus +from .... import LOGGER +from ...ext_utils.bot_utils import new_task +from ...ext_utils.status_utils import ( + get_readable_file_size, + MirrorStatus, + get_readable_time, +) class FFmpegStatus: def __init__(self, listener, gid, status=""): self.listener = listener self._gid = gid - self._size = self.listener.size + self._processed_bytes = 0 + self._speed_raw = 0 + self._progress_raw = 0 + self._active = False self.cstatus = status + @new_task + async def _ffmpeg_progress(self): + while True: + async with self.listener.subprocess_lock: + if self.listener.subproc is None or self.listener.is_cancelled: + break + line = await self.listener.subproc.stdout.readline() + if not line: + break + line = line.decode().strip() + if "=" in line: + key, value = line.split("=", 1) + if value != "N/A": + if key == "total_size": + self._processed_bytes = int(value) + self._progress_raw = ( + self._processed_bytes / self.listener.subsize * 100 + ) + elif key == "bitrate": + self._speed_raw = (float(value.strip("kbits/s")) / 8) * 1000 + self._active = False + + def speed(self): + return f"{get_readable_file_size(self._speed_raw)}/s" + + def processed_bytes(self): + return get_readable_file_size(self._processed_bytes) + + async def progress(self): + if not self._active and self.listener.subsize and self.listener.subproc is not None: + await self._ffmpeg_progress() + self._active = True + return f"{round(self._progress_raw, 2)}%" + def gid(self): return self._gid @@ -16,7 +58,14 @@ def name(self): return self.listener.name def size(self): - return get_readable_file_size(self._size) + return get_readable_file_size(self.listener.size) + + def eta(self): + try: + seconds = (self.listener.subsize - self._processed_bytes) / self._speed_raw + return get_readable_time(seconds) + except: + return "-" def status(self): if self.cstatus == "Convert": @@ -33,8 +82,8 @@ def task(self): async def cancel_task(self): LOGGER.info(f"Cancelling {self.cstatus}: {self.listener.name}") - self.listener.is_cancelled = True - async with subprocess_lock: + async with self.listener.subprocess_lock: + self.listener.is_cancelled = True if ( self.listener.subproc is not None and self.listener.subproc.returncode is None diff --git a/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py b/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py index 5e2fa2c1682..75777b45d61 100644 --- a/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/sevenz_status.py @@ -1,7 +1,8 @@ from time import time +from re import search -from .... import LOGGER, subprocess_lock -from ...ext_utils.files_utils import get_path_size +from .... import LOGGER +from ...ext_utils.bot_utils import new_task from ...ext_utils.status_utils import ( get_readable_file_size, MirrorStatus, @@ -12,40 +13,80 @@ class SevenZStatus: def __init__(self, listener, gid, status=""): self.listener = listener - self._size = self.listener.size self._gid = gid self._start_time = time() - self._proccessed_bytes = 0 + self._processed_bytes = 0 + self._progress_str = "0%" + self._active = False self.cstatus = status + @new_task + async def _sevenz_progress(self): + pattern = r"\b(?:Add\s+new\s+data\s+to\s+archive:.*?,\s+(\d+)\s+bytes|Physical\s+Size\s*=\s*(\d+))" + while True: + async with self.listener.subprocess_lock: + if self.listener.subproc is None or self.listener.is_cancelled: + break + line = await self.listener.subproc.stdout.readline() + line = line.decode().strip() + if line.startswith("Add new data to archive:") or line.startswith( + "Physical Size =" + ): + if match := search(pattern, line): + size = match[1] or match[2] + self.listener.subsize = int(size) + break + s = b"" + while True: + async with self.listener.subprocess_lock: + if self.listener.is_cancelled or self.listener.subproc is None: + break + char = await self.listener.subproc.stdout.read(1) + if not char: + break + s += char + if char == b"%": + try: + self._progress_str = s.decode().rsplit(" ", 1)[-1].strip() + self._processed_bytes = ( + int(self._progress_str.strip("%")) / 100 + ) * self.listener.subsize + except: + self._processed_bytes = 0 + self._progress_str = "0%" + s = b"" + + self._active = False + def gid(self): return self._gid - def speed_raw(self): - return self._proccessed_bytes / (time() - self._start_time) - - async def progress_raw(self): - await self.processed_raw() - try: - return self._proccessed_bytes / self._size * 100 - except: - return 0 + def _speed_raw(self): + return self._processed_bytes / (time() - self._start_time) async def progress(self): - return f"{round(await self.progress_raw(), 2)}%" + if not self._active and self.listener.subproc is not None: + await self._sevenz_progress() + self._active = True + return self._progress_str def speed(self): - return f"{get_readable_file_size(self.speed_raw())}/s" + return f"{get_readable_file_size(self._speed_raw())}/s" + + def processed_bytes(self): + return get_readable_file_size(self._processed_bytes) def name(self): return self.listener.name def size(self): - return get_readable_file_size(self._size) + return get_readable_file_size(self.listener.size) def eta(self): try: - seconds = (self._size - self._proccessed_bytes) / self.speed_raw() + seconds = ( + self.listener.subsize - self._processed_bytes + ) / self._speed_raw() return get_readable_time(seconds) except: return "-" @@ -57,21 +98,15 @@ def status(self): return MirrorStatus.STATUS_ARCHIVE def processed_bytes(self): - return get_readable_file_size(self._proccessed_bytes) - - async def processed_raw(self): - if self.listener.new_dir: - self._proccessed_bytes = await get_path_size(self.listener.new_dir) - else: - self._proccessed_bytes = await get_path_size(self.listener.dir) - self._size + return get_readable_file_size(self._processed_bytes) def task(self): return self async def cancel_task(self): LOGGER.info(f"Cancelling {self.cstatus}: {self.listener.name}") - self.listener.is_cancelled = True - async with subprocess_lock: + async with self.listener.subprocess_lock: + self.listener.is_cancelled = True if ( self.listener.subproc is not None and self.listener.subproc.returncode is None diff --git a/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py b/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py index 57deecf387f..8bd18406115 100644 --- a/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py +++ b/bot/helper/mirror_leech_utils/status_utils/yt_dlp_status.py @@ -11,19 +11,19 @@ def __init__(self, listener, obj, gid): self._obj = obj self._gid = gid self.listener = listener - self._proccessed_bytes = 0 + self._processed_bytes = 0 def gid(self): return self._gid def processed_bytes(self): - return get_readable_file_size(self._proccessed_bytes) + return get_readable_file_size(self._processed_bytes) async def processed_raw(self): if self._obj.downloaded_bytes != 0: - self._proccessed_bytes = self._obj.downloaded_bytes + self._processed_bytes = self._obj.downloaded_bytes else: - self._proccessed_bytes = await get_path_size(self.listener.dir) + self._processed_bytes = await get_path_size(self.listener.dir) def size(self): return get_readable_file_size(self._obj.size) @@ -46,7 +46,7 @@ def eta(self): return get_readable_time(self._obj.eta) try: seconds = ( - self._obj.size - self._proccessed_bytes + self._obj.size - self._processed_bytes ) / self._obj.download_speed return get_readable_time(seconds) except: diff --git a/bot/helper/mirror_leech_utils/telegram_uploader.py b/bot/helper/mirror_leech_utils/telegram_uploader.py index 534f35da253..b96ce5e62ad 100644 --- a/bot/helper/mirror_leech_utils/telegram_uploader.py +++ b/bot/helper/mirror_leech_utils/telegram_uploader.py @@ -253,6 +253,9 @@ async def upload(self, o_files, ft_delete): delete_file = False self._error = "" self._up_path = f_path = ospath.join(dirpath, file_) + if not ospath.exists(self._up_path): + LOGGER.error(f"{self._up_path} not exists! Continue uploading!") + continue if self._up_path in ft_delete: delete_file = True if self._up_path in o_files: diff --git a/bot/modules/bot_settings.py b/bot/modules/bot_settings.py index 294531ceaa8..c7da4637ca7 100644 --- a/bot/modules/bot_settings.py +++ b/bot/modules/bot_settings.py @@ -321,6 +321,8 @@ async def edit_variable(_, message, pre_message, key): value = int(value) elif value.startswith("[") and value.endswith("]"): value = eval(value) + elif value.startswith("{") and value.endswith("}"): + value = eval(value) if key not in [ "CMD_SUFFIX", "OWNER_ID", diff --git a/bot/modules/mirror_leech.py b/bot/modules/mirror_leech.py index aaf63021a17..50055145277 100644 --- a/bot/modules/mirror_leech.py +++ b/bot/modules/mirror_leech.py @@ -153,7 +153,10 @@ async def new_event(self): self.multi = 0 try: - self.ffmpeg_cmds = eval(args["-ff"]) + if args["-ff"].strip().startswith("["): + self.ffmpeg_cmds = eval(args["-ff"]) + else: + self.ffmpeg_cmds = args["-ff"] except Exception as e: self.ffmpeg_cmds = None LOGGER.error(e) diff --git a/bot/modules/restart.py b/bot/modules/restart.py index f7347e6bbb6..4b260d9438d 100644 --- a/bot/modules/restart.py +++ b/bot/modules/restart.py @@ -123,7 +123,7 @@ async def confirm_restart(_, query): "pkill", "-9", "-f", - "gunicorn|aria2c|qbittorrent-nox|ffmpeg|rclone|java|sabnzbdplus", + "gunicorn|aria2c|qbittorrent-nox|ffmpeg|rclone|java|sabnzbdplus|7z", ) proc2 = await create_subprocess_exec("python3", "update.py") await gather(proc1.wait(), proc2.wait()) diff --git a/bot/modules/users_settings.py b/bot/modules/users_settings.py index eb9d4c8fc4c..9c6a9f2f8c0 100644 --- a/bot/modules/users_settings.py +++ b/bot/modules/users_settings.py @@ -314,7 +314,7 @@ async def set_option(_, message, pre_event, option): user_dict["upload_paths"][name] = path value = user_dict["upload_paths"] elif option == "ffmpeg_cmds": - if value.startswith("[") and value.endswith("]"): + if value.startswith("{") and value.endswith("}"): try: value = eval(value) except Exception as e: @@ -645,11 +645,12 @@ async def edit_user_settings(client, query): ) buttons.data_button("Back", f"userset {user_id} back") buttons.data_button("Close", f"userset {user_id} close") - rmsg = """list of lists of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. + rmsg = """Dict of list values of ffmpeg commands. You can set multiple ffmpeg commands for all files before upload. Don't write ffmpeg at beginning, start directly with the arguments. +Examples: {"subtitle": ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb"], "convert": ["-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"]} Notes: -1. Add -del to the list which you want from the bot to delete the original files after command run complete! -2. Seed will get disbaled while using this option -Examples: ["-i mltb.mkv -c copy -c:s srt mltb.mkv", "-i mltb.video -c copy -c:s srt mltb", "-i mltb.m4a -c:a libmp3lame -q:a 2 mltb.mp3", "-i mltb.audio -c:a libmp3lame -q:a 2 mltb.mp3"] +- Add `-del` to the list which you want from the bot to delete the original files after command run complete! +- To execute one of those lists in bot for example, you must use -ff subtitle (list key) or -ff convert (list key) +- Seed will get disbaled while using this option Here I will explain how to use mltb.* which is reference to files you want to work on. 1. First cmd: the input is mltb.mkv so this cmd will work only on mkv videos and the output is mltb.mkv also so all outputs is mkv. -del will delete the original media after complete run of the cmd. 2. Second cmd: the input is mltb.video so this cmd will work on all videos and the output is only mltb so the extenstion is same as input files. diff --git a/bot/modules/ytdlp.py b/bot/modules/ytdlp.py index b6f5b1447f7..e638ea743f8 100644 --- a/bot/modules/ytdlp.py +++ b/bot/modules/ytdlp.py @@ -321,7 +321,10 @@ async def new_event(self): self.multi = 0 try: - self.ffmpeg_cmds = eval(args["-ff"]) + if args["-ff"].strip().startswith("["): + self.ffmpeg_cmds = eval(args["-ff"]) + else: + self.ffmpeg_cmds = args["-ff"] except Exception as e: self.ffmpeg_cmds = None LOGGER.error(e) diff --git a/config_sample.py b/config_sample.py index 01f717e9146..b0ced87499f 100644 --- a/config_sample.py +++ b/config_sample.py @@ -20,7 +20,7 @@ YT_DLP_OPTIONS = "" USE_SERVICE_ACCOUNTS = False NAME_SUBSTITUTE = "" -FFMPEG_CMDS = [] +FFMPEG_CMDS = {} # GDrive Tools GDRIVE_ID = "" IS_TEAM_DRIVE = False