From a440c6ae232b5bbd0d530b61518a5a765b84c072 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Tue, 24 Sep 2024 13:05:10 -0400 Subject: [PATCH 1/3] Refactor download.py Uses SQLAlchemy... --- cps/tasks/download.py | 269 +++++++++++++++++++++++++----------------- 1 file changed, 164 insertions(+), 105 deletions(-) diff --git a/cps/tasks/download.py b/cps/tasks/download.py index d5d28945d8..a7c5c77681 100644 --- a/cps/tasks/download.py +++ b/cps/tasks/download.py @@ -2,24 +2,29 @@ import re import requests import select -import sqlite3 from datetime import datetime from flask_babel import lazy_gettext as N_, gettext as _ -from cps.constants import XKLB_DB_FILE from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS, STAT_FAIL, STAT_STARTED, STAT_WAITING -from cps.subproc_wrapper import process_open +from cps.services.xb_utils import DatabaseService, Settings, execute_subprocess +from cps.xb import Media +from sqlalchemy.orm import Session from .. import logger from time import sleep log = logger.create() class TaskDownload(CalibreTask): - def __init__(self, task_message, media_url, original_url, current_user_name, shelf_id, duration, live_status): + """Task class for downloading media.""" + + def __init__(self, session: Session, task_message, media_url, original_url, current_user_name, shelf_id, duration, live_status): super(TaskDownload, self).__init__(task_message) + self.session = session + self.db_service = DatabaseService(self.session) self.message = task_message self.media_url = media_url self.media_url_link = f'{media_url}' + self.media_id = None self.original_url = original_url self.current_user_name = current_user_name self.shelf_id = shelf_id @@ -30,124 +35,178 @@ def __init__(self, task_message, media_url, original_url, current_user_name, she self.progress = 0 def run(self, worker_thread): - """Run the download task""" + """Runs the download task.""" self.worker_thread = worker_thread - self.start_time = self.end_time = datetime.now() + log.info("Starting download task for URL: %s", self.media_url) + self.start_time = datetime.now() self.stat = STAT_STARTED self.progress = 0 - lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper") + lb_executable = Settings.LB_WRAPPER if self.media_url: subprocess_args = [lb_executable, "dl", self.media_url] - log.info("Subprocess args: %s", subprocess_args) + log.debug("Subprocess args: %s", subprocess_args) # Execute the download process using process_open try: - p = process_open(subprocess_args, newlines=True) - - # Define the patterns for the subprocess output - # Equivalent Regex's: https://github.com/iiab/calibre-web/blob/8684ffb491244e15ab927dfb390114240e483eb3/scripts/lb-wrapper#L59-L60 - pattern_progress = r"^downloading" - pattern_success = r"\[{}\]:".format(self.media_url) - - complete_progress_cycle = 0 - - last_progress_time = datetime.now() - timeout = 120 # seconds - - self.message = f"Downloading {self.media_url_link}..." - if self.live_status == "was_live": - self.message += f" (formerly live video, length/duration {self.duration})" - while p.poll() is None: - self.end_time = datetime.now() - # Check if there's data available to read - rlist, _, _ = select.select([p.stdout], [], [], 0.1) - if rlist: - line = p.stdout.readline() - if line: - if re.search(pattern_success, line) or complete_progress_cycle == 4: - # 2024-01-10: 99% (a bit arbitrary) is explained here... - # https://github.com/iiab/calibre-web/pull/88#issuecomment-1885916421 - self.progress = 0.99 - break - elif re.search(pattern_progress, line): - percentage = int(re.search(r'\d+', line).group()) - if percentage < 100: - self.progress = min(0.99, (complete_progress_cycle + (percentage / 100)) / 4) - if percentage == 100: - complete_progress_cycle += 1 - last_progress_time = datetime.now() - else: - elapsed_time = (datetime.now() - last_progress_time).total_seconds() - if elapsed_time >= timeout: - self.message = f"{self.media_url_link} is taking longer than expected. It could be a stuck download due to unavailable fragments (yt-dlp/yt-dlp#2137) and/or an error in xklb's media_check. Please wait as we keep trying. See #223 for more info." - sleep(0.1) - + p = execute_subprocess(subprocess_args) + self._monitor_subprocess(p) p.wait() - - # Database operations - with sqlite3.connect(XKLB_DB_FILE) as conn: - try: - requested_file = conn.execute("SELECT path FROM media WHERE webpath = ? AND path NOT LIKE 'http%'", (self.media_url,)).fetchone()[0] - - # Abort if there is not a path - if not requested_file: - log.info("No path found in the database") - error = conn.execute("SELECT error, webpath FROM media WHERE error IS NOT NULL").fetchone() - if error: - log.error("[xklb] An error occurred while trying to download %s: %s", error[1], error[0]) - self.message = f"{error[1]} failed to download: {error[0]}" - else: - log.error("%s failed to download: No path or error found in the database (likely the video failed due to unavailable fragments?)", self.media_url) - self.message = f"{self.media_url_link} failed to download: No path or error found in the database (likely the video failed due to unavailable fragments?)" - media_id = conn.execute("SELECT id FROM media WHERE webpath = ?", (self.media_url,)).fetchone()[0] - conn.execute("DELETE FROM media WHERE webpath = ?", (self.media_url,)) - conn.execute("DELETE FROM captions WHERE media_id = ?", (media_id,)) - return - except sqlite3.Error as db_error: - log.error("An error occurred while trying to connect to the database: %s", db_error) - self.message = f"{self.media_url_link} failed to download: {db_error}" - - self.message = self.message + "\n" + f"Almost done..." - response = requests.get(self.original_url, params={"requested_file": requested_file, "current_user_name": self.current_user_name, "shelf_id": self.shelf_id}) - if response.status_code == 200: - log.info("Successfully sent the requested file to %s", self.original_url) - file_downloaded = response.json()["file_downloaded"] - self.message = f"Successfully downloaded {self.media_url_link} to

{file_downloaded}" - new_video_path = response.json()["new_book_path"] - new_video_path = next((os.path.join(new_video_path, file) for file in os.listdir(new_video_path) if file.endswith((".webm", ".mp4"))), None) - # 2024-02-17: Dedup Design Evolving... https://github.com/iiab/calibre-web/pull/125 - conn.execute("UPDATE media SET path = ? WHERE webpath = ?", (new_video_path, self.media_url)) - conn.execute("UPDATE media SET webpath = ? WHERE path = ?", (f"{self.media_url}×tamp={int(datetime.now().timestamp())}", new_video_path)) - self.progress = 1.0 - else: - log.error("Failed to send the requested file to %s", self.original_url) - self.message = f"{self.media_url_link} failed to download: {response.status_code} {response.reason}" - - conn.close() - + self._post_process_download() except Exception as e: log.error("An error occurred during the subprocess execution: %s", e) - self.message = f"{self.media_url_link} failed to download: {self.read_error_from_database()}" - + self.message = f"{self.media_url_link} failed to download: {self.db_service.read_error_from_database(self.media_url)}" + self.stat = STAT_FAIL + return finally: self.end_time = datetime.now() - if p.returncode == 0 or self.progress == 1.0: - self.stat = STAT_FINISH_SUCCESS - log.info("Download task for %s completed successfully", self.media_url) - else: - self.stat = STAT_FAIL + if p.returncode == 0 or self.progress == 1.0: + self.stat = STAT_FINISH_SUCCESS + log.info("Download task for %s completed successfully.", self.media_url) + else: + self.stat = STAT_FAIL + log.error("Download task for %s failed.", self.media_url) + else: + log.warning("No media URL provided - skipping download task.") + self.stat = STAT_FAIL + self.message = "No media URL provided." + self.end_time = datetime.now() + + def _monitor_subprocess(self, process): + """Monitors the subprocess for progress and handles timeouts.""" + log.debug("Monitoring subprocess for download progress.") + pattern_progress = r"^downloading" + pattern_success = re.escape(self.media_url) + + complete_progress_cycle = 0 + last_progress_time = datetime.now() + timeout = Settings.TIMEOUT + + self.message = f"Downloading {self.media_url_link}..." + if self.live_status == "was_live": + self.message += f" (formerly live video, length/duration {self.duration})" + + while process.poll() is None: + self.end_time = datetime.now() + rlist, _, _ = select.select([process.stdout], [], [], 0.1) + if rlist: + line = process.stdout.readline() + if line: + if re.search(pattern_success, line) or complete_progress_cycle == 4: + self.progress = 0.99 + log.debug("Download progress reached 99%.") + break + elif re.search(pattern_progress, line): + percentage_match = re.search(r'\d+', line) + if percentage_match: + percentage = int(percentage_match.group()) + if percentage < 100: + self.progress = min(0.99, (complete_progress_cycle + (percentage / 100)) / 4) + if percentage == 100: + complete_progress_cycle += 1 + last_progress_time = datetime.now() + log.debug("Completed progress cycle %d.", complete_progress_cycle) + else: + elapsed_time = (datetime.now() - last_progress_time).total_seconds() + if elapsed_time >= timeout: + self.message = f"{self.media_url_link} is taking longer than expected. It could be a stuck download due to unavailable fragments. Please wait as we keep trying." + log.warning("Download taking longer than expected for URL: %s", self.media_url) + sleep(0.1) + + def _post_process_download(self): + """Handles post-download operations.""" + log.debug("Post-processing after download.") + try: + with self.session.begin(): + # Fetch the media entry from the database + media_entry = self.session.query(Media).filter( + Media.webpath == self.media_url, + ~Media.path.like('http%') + ).first() + + if media_entry: + self.media_id = media_entry.id + requested_file = media_entry.path + + # Abort if there is not a path + if not requested_file: + self._handle_no_path(media_entry) + return + else: + self._handle_no_media_entry() + return + + self.message += "\nAlmost done..." + response = requests.get(self.original_url, params={ + "requested_file": requested_file, + "current_user_name": self.current_user_name, + "shelf_id": self.shelf_id, + "media_id": self.media_id + }) + if response.status_code == 200: + log.info("Successfully sent the requested file to %s", self.original_url) + file_downloaded = response.json().get("file_downloaded") + self.message = f"Successfully downloaded {self.media_url_link} to

{file_downloaded}" + new_video_path = response.json().get("new_book_path") + if new_video_path: + new_video_path = next( + (os.path.join(new_video_path, file) for file in os.listdir(new_video_path) if file.endswith((".webm", ".mp4"))), + None + ) + + # Update media path and webpath in the database + media_entry.path = new_video_path + media_entry.webpath = f"{self.media_url}×tamp={int(datetime.now().timestamp())}" + self.session.commit() + self.progress = 1.0 + log.info("Media entry updated in the database.") + else: + log.error("new_book_path not found in the response") + self.message = f"{self.media_url_link} failed to download: 'new_book_path' not found in response" + else: + log.error("Failed to send the requested file to %s", self.original_url) + self.message = f"{self.media_url_link} failed to download: {response.status_code} {response.reason}" + + except Exception as e: + self.session.rollback() + log.error("An error occurred during post-download operations: %s", e) + self.message = f"{self.media_url_link} failed to download: {e}" + self.stat = STAT_FAIL + + def _handle_no_path(self, media_entry): + """Handles the case when no path is found in the media entry.""" + log.warning("No path found in the media entry for media ID: %s", media_entry.id) + error_message = self.db_service.read_error_from_database(self.media_url) + if error_message != "No error message found in database": + log.error("An error occurred while trying to download %s: %s", self.media_url, error_message) + self.message = f"{self.media_url_link} failed to download: {error_message}" + else: + log.error("%s failed to download: No path or error found in the database.", self.media_url) + self.message = f"{self.media_url_link} failed to download: No path or error found in the database." + + # Delete media and captions entries + self.db_service.delete_media_and_captions(self.media_id, self.media_url) + + def _handle_no_media_entry(self): + """Handles the case when no media entry is found in the database.""" + log.warning("No media entry found in the database for webpath %s", self.media_url) + error_message = self.db_service.read_error_from_database(self.media_url) + if error_message != "No error message found in database": + log.error("An error occurred while trying to download %s: %s", self.media_url, error_message) + self.message = f"{self.media_url_link} failed to download: {error_message}" else: - log.info("No media URL provided - skipping download task") - - def read_error_from_database(self): - """Read the error from the database""" - with sqlite3.connect(XKLB_DB_FILE) as conn: - error = conn.execute("SELECT error FROM media WHERE webpath = ?", (self.media_url,)).fetchone()[0] - conn.close() - return error + log.error("%s failed to download: No path or error found in the database.", self.media_url) + self.message = f"{self.media_url_link} failed to download: No path or error found in the database." + + media_entry = self.session.query(Media.id).filter(Media.webpath == self.media_url).first() + if media_entry: + self.media_id = media_entry.id + # Delete media and captions entries + self.db_service.delete_media_and_captions(self.media_id, self.media_url) + else: + self.media_id = None @property def name(self): From 95a0d554e50fef881f129b4ed2d929249dbffd17 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Tue, 24 Sep 2024 22:20:31 -0400 Subject: [PATCH 2/3] Use right session --- cps/tasks/download.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cps/tasks/download.py b/cps/tasks/download.py index a7c5c77681..099989f7a3 100644 --- a/cps/tasks/download.py +++ b/cps/tasks/download.py @@ -7,8 +7,7 @@ from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS, STAT_FAIL, STAT_STARTED, STAT_WAITING from cps.services.xb_utils import DatabaseService, Settings, execute_subprocess -from cps.xb import Media -from sqlalchemy.orm import Session +from cps.xb import XKLBDB, Media from .. import logger from time import sleep @@ -17,9 +16,10 @@ class TaskDownload(CalibreTask): """Task class for downloading media.""" - def __init__(self, session: Session, task_message, media_url, original_url, current_user_name, shelf_id, duration, live_status): + def __init__(self, task_message, media_url, original_url, current_user_name, shelf_id, duration, live_status): super(TaskDownload, self).__init__(task_message) - self.session = session + db = XKLBDB() + self.session = db.get_session() self.db_service = DatabaseService(self.session) self.message = task_message self.media_url = media_url From 81a15fdc0a13163e5c9e721a61f5072700daefa3 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Fri, 1 Nov 2024 23:53:25 -0400 Subject: [PATCH 3/3] Update download.py to close session when done --- cps/tasks/download.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cps/tasks/download.py b/cps/tasks/download.py index 099989f7a3..12e1e687f2 100644 --- a/cps/tasks/download.py +++ b/cps/tasks/download.py @@ -174,6 +174,8 @@ def _post_process_download(self): log.error("An error occurred during post-download operations: %s", e) self.message = f"{self.media_url_link} failed to download: {e}" self.stat = STAT_FAIL + finally: + self.db_service.close_session() def _handle_no_path(self, media_entry): """Handles the case when no path is found in the media entry."""