diff --git a/cps/tasks/download.py b/cps/tasks/download.py
index d5d28945d8..12e1e687f2 100644
--- a/cps/tasks/download.py
+++ b/cps/tasks/download.py
@@ -2,24 +2,29 @@
import re
import requests
import select
-import sqlite3
from datetime import datetime
from flask_babel import lazy_gettext as N_, gettext as _
-from cps.constants import XKLB_DB_FILE
from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS, STAT_FAIL, STAT_STARTED, STAT_WAITING
-from cps.subproc_wrapper import process_open
+from cps.services.xb_utils import DatabaseService, Settings, execute_subprocess
+from cps.xb import XKLBDB, Media
from .. import logger
from time import sleep
log = logger.create()
class TaskDownload(CalibreTask):
+ """Task class for downloading media."""
+
def __init__(self, task_message, media_url, original_url, current_user_name, shelf_id, duration, live_status):
super(TaskDownload, self).__init__(task_message)
+ db = XKLBDB()
+ self.session = db.get_session()
+ self.db_service = DatabaseService(self.session)
self.message = task_message
self.media_url = media_url
self.media_url_link = f'{media_url}'
+ self.media_id = None
self.original_url = original_url
self.current_user_name = current_user_name
self.shelf_id = shelf_id
@@ -30,124 +35,180 @@ def __init__(self, task_message, media_url, original_url, current_user_name, she
self.progress = 0
def run(self, worker_thread):
- """Run the download task"""
+ """Runs the download task."""
self.worker_thread = worker_thread
- self.start_time = self.end_time = datetime.now()
+ log.info("Starting download task for URL: %s", self.media_url)
+ self.start_time = datetime.now()
self.stat = STAT_STARTED
self.progress = 0
- lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper")
+ lb_executable = Settings.LB_WRAPPER
if self.media_url:
subprocess_args = [lb_executable, "dl", self.media_url]
- log.info("Subprocess args: %s", subprocess_args)
+ log.debug("Subprocess args: %s", subprocess_args)
# Execute the download process using process_open
try:
- p = process_open(subprocess_args, newlines=True)
-
- # Define the patterns for the subprocess output
- # Equivalent Regex's: https://github.com/iiab/calibre-web/blob/8684ffb491244e15ab927dfb390114240e483eb3/scripts/lb-wrapper#L59-L60
- pattern_progress = r"^downloading"
- pattern_success = r"\[{}\]:".format(self.media_url)
-
- complete_progress_cycle = 0
-
- last_progress_time = datetime.now()
- timeout = 120 # seconds
-
- self.message = f"Downloading {self.media_url_link}..."
- if self.live_status == "was_live":
- self.message += f" (formerly live video, length/duration {self.duration})"
- while p.poll() is None:
- self.end_time = datetime.now()
- # Check if there's data available to read
- rlist, _, _ = select.select([p.stdout], [], [], 0.1)
- if rlist:
- line = p.stdout.readline()
- if line:
- if re.search(pattern_success, line) or complete_progress_cycle == 4:
- # 2024-01-10: 99% (a bit arbitrary) is explained here...
- # https://github.com/iiab/calibre-web/pull/88#issuecomment-1885916421
- self.progress = 0.99
- break
- elif re.search(pattern_progress, line):
- percentage = int(re.search(r'\d+', line).group())
- if percentage < 100:
- self.progress = min(0.99, (complete_progress_cycle + (percentage / 100)) / 4)
- if percentage == 100:
- complete_progress_cycle += 1
- last_progress_time = datetime.now()
- else:
- elapsed_time = (datetime.now() - last_progress_time).total_seconds()
- if elapsed_time >= timeout:
- self.message = f"{self.media_url_link} is taking longer than expected. It could be a stuck download due to unavailable fragments (yt-dlp/yt-dlp#2137) and/or an error in xklb's media_check. Please wait as we keep trying. See #223 for more info."
- sleep(0.1)
-
+ p = execute_subprocess(subprocess_args)
+ self._monitor_subprocess(p)
p.wait()
-
- # Database operations
- with sqlite3.connect(XKLB_DB_FILE) as conn:
- try:
- requested_file = conn.execute("SELECT path FROM media WHERE webpath = ? AND path NOT LIKE 'http%'", (self.media_url,)).fetchone()[0]
-
- # Abort if there is not a path
- if not requested_file:
- log.info("No path found in the database")
- error = conn.execute("SELECT error, webpath FROM media WHERE error IS NOT NULL").fetchone()
- if error:
- log.error("[xklb] An error occurred while trying to download %s: %s", error[1], error[0])
- self.message = f"{error[1]} failed to download: {error[0]}"
- else:
- log.error("%s failed to download: No path or error found in the database (likely the video failed due to unavailable fragments?)", self.media_url)
- self.message = f"{self.media_url_link} failed to download: No path or error found in the database (likely the video failed due to unavailable fragments?)"
- media_id = conn.execute("SELECT id FROM media WHERE webpath = ?", (self.media_url,)).fetchone()[0]
- conn.execute("DELETE FROM media WHERE webpath = ?", (self.media_url,))
- conn.execute("DELETE FROM captions WHERE media_id = ?", (media_id,))
- return
- except sqlite3.Error as db_error:
- log.error("An error occurred while trying to connect to the database: %s", db_error)
- self.message = f"{self.media_url_link} failed to download: {db_error}"
-
- self.message = self.message + "\n" + f"Almost done..."
- response = requests.get(self.original_url, params={"requested_file": requested_file, "current_user_name": self.current_user_name, "shelf_id": self.shelf_id})
- if response.status_code == 200:
- log.info("Successfully sent the requested file to %s", self.original_url)
- file_downloaded = response.json()["file_downloaded"]
- self.message = f"Successfully downloaded {self.media_url_link} to
{file_downloaded}"
- new_video_path = response.json()["new_book_path"]
- new_video_path = next((os.path.join(new_video_path, file) for file in os.listdir(new_video_path) if file.endswith((".webm", ".mp4"))), None)
- # 2024-02-17: Dedup Design Evolving... https://github.com/iiab/calibre-web/pull/125
- conn.execute("UPDATE media SET path = ? WHERE webpath = ?", (new_video_path, self.media_url))
- conn.execute("UPDATE media SET webpath = ? WHERE path = ?", (f"{self.media_url}×tamp={int(datetime.now().timestamp())}", new_video_path))
- self.progress = 1.0
- else:
- log.error("Failed to send the requested file to %s", self.original_url)
- self.message = f"{self.media_url_link} failed to download: {response.status_code} {response.reason}"
-
- conn.close()
-
+ self._post_process_download()
except Exception as e:
log.error("An error occurred during the subprocess execution: %s", e)
- self.message = f"{self.media_url_link} failed to download: {self.read_error_from_database()}"
-
+ self.message = f"{self.media_url_link} failed to download: {self.db_service.read_error_from_database(self.media_url)}"
+ self.stat = STAT_FAIL
+ return
finally:
self.end_time = datetime.now()
- if p.returncode == 0 or self.progress == 1.0:
- self.stat = STAT_FINISH_SUCCESS
- log.info("Download task for %s completed successfully", self.media_url)
- else:
- self.stat = STAT_FAIL
+ if p.returncode == 0 or self.progress == 1.0:
+ self.stat = STAT_FINISH_SUCCESS
+ log.info("Download task for %s completed successfully.", self.media_url)
+ else:
+ self.stat = STAT_FAIL
+ log.error("Download task for %s failed.", self.media_url)
+ else:
+ log.warning("No media URL provided - skipping download task.")
+ self.stat = STAT_FAIL
+ self.message = "No media URL provided."
+ self.end_time = datetime.now()
+
+ def _monitor_subprocess(self, process):
+ """Monitors the subprocess for progress and handles timeouts."""
+ log.debug("Monitoring subprocess for download progress.")
+ pattern_progress = r"^downloading"
+ pattern_success = re.escape(self.media_url)
+
+ complete_progress_cycle = 0
+ last_progress_time = datetime.now()
+ timeout = Settings.TIMEOUT
+
+ self.message = f"Downloading {self.media_url_link}..."
+ if self.live_status == "was_live":
+ self.message += f" (formerly live video, length/duration {self.duration})"
+
+ while process.poll() is None:
+ self.end_time = datetime.now()
+ rlist, _, _ = select.select([process.stdout], [], [], 0.1)
+ if rlist:
+ line = process.stdout.readline()
+ if line:
+ if re.search(pattern_success, line) or complete_progress_cycle == 4:
+ self.progress = 0.99
+ log.debug("Download progress reached 99%.")
+ break
+ elif re.search(pattern_progress, line):
+ percentage_match = re.search(r'\d+', line)
+ if percentage_match:
+ percentage = int(percentage_match.group())
+ if percentage < 100:
+ self.progress = min(0.99, (complete_progress_cycle + (percentage / 100)) / 4)
+ if percentage == 100:
+ complete_progress_cycle += 1
+ last_progress_time = datetime.now()
+ log.debug("Completed progress cycle %d.", complete_progress_cycle)
+ else:
+ elapsed_time = (datetime.now() - last_progress_time).total_seconds()
+ if elapsed_time >= timeout:
+ self.message = f"{self.media_url_link} is taking longer than expected. It could be a stuck download due to unavailable fragments. Please wait as we keep trying."
+ log.warning("Download taking longer than expected for URL: %s", self.media_url)
+ sleep(0.1)
+
+ def _post_process_download(self):
+ """Handles post-download operations."""
+ log.debug("Post-processing after download.")
+ try:
+ with self.session.begin():
+ # Fetch the media entry from the database
+ media_entry = self.session.query(Media).filter(
+ Media.webpath == self.media_url,
+ ~Media.path.like('http%')
+ ).first()
+
+ if media_entry:
+ self.media_id = media_entry.id
+ requested_file = media_entry.path
+
+ # Abort if there is not a path
+ if not requested_file:
+ self._handle_no_path(media_entry)
+ return
+ else:
+ self._handle_no_media_entry()
+ return
+
+ self.message += "\nAlmost done..."
+ response = requests.get(self.original_url, params={
+ "requested_file": requested_file,
+ "current_user_name": self.current_user_name,
+ "shelf_id": self.shelf_id,
+ "media_id": self.media_id
+ })
+ if response.status_code == 200:
+ log.info("Successfully sent the requested file to %s", self.original_url)
+ file_downloaded = response.json().get("file_downloaded")
+ self.message = f"Successfully downloaded {self.media_url_link} to
{file_downloaded}"
+ new_video_path = response.json().get("new_book_path")
+ if new_video_path:
+ new_video_path = next(
+ (os.path.join(new_video_path, file) for file in os.listdir(new_video_path) if file.endswith((".webm", ".mp4"))),
+ None
+ )
+
+ # Update media path and webpath in the database
+ media_entry.path = new_video_path
+ media_entry.webpath = f"{self.media_url}×tamp={int(datetime.now().timestamp())}"
+ self.session.commit()
+ self.progress = 1.0
+ log.info("Media entry updated in the database.")
+ else:
+ log.error("new_book_path not found in the response")
+ self.message = f"{self.media_url_link} failed to download: 'new_book_path' not found in response"
+ else:
+ log.error("Failed to send the requested file to %s", self.original_url)
+ self.message = f"{self.media_url_link} failed to download: {response.status_code} {response.reason}"
+
+ except Exception as e:
+ self.session.rollback()
+ log.error("An error occurred during post-download operations: %s", e)
+ self.message = f"{self.media_url_link} failed to download: {e}"
+ self.stat = STAT_FAIL
+ finally:
+ self.db_service.close_session()
+
+ def _handle_no_path(self, media_entry):
+ """Handles the case when no path is found in the media entry."""
+ log.warning("No path found in the media entry for media ID: %s", media_entry.id)
+ error_message = self.db_service.read_error_from_database(self.media_url)
+ if error_message != "No error message found in database":
+ log.error("An error occurred while trying to download %s: %s", self.media_url, error_message)
+ self.message = f"{self.media_url_link} failed to download: {error_message}"
+ else:
+ log.error("%s failed to download: No path or error found in the database.", self.media_url)
+ self.message = f"{self.media_url_link} failed to download: No path or error found in the database."
+
+ # Delete media and captions entries
+ self.db_service.delete_media_and_captions(self.media_id, self.media_url)
+
+ def _handle_no_media_entry(self):
+ """Handles the case when no media entry is found in the database."""
+ log.warning("No media entry found in the database for webpath %s", self.media_url)
+ error_message = self.db_service.read_error_from_database(self.media_url)
+ if error_message != "No error message found in database":
+ log.error("An error occurred while trying to download %s: %s", self.media_url, error_message)
+ self.message = f"{self.media_url_link} failed to download: {error_message}"
else:
- log.info("No media URL provided - skipping download task")
-
- def read_error_from_database(self):
- """Read the error from the database"""
- with sqlite3.connect(XKLB_DB_FILE) as conn:
- error = conn.execute("SELECT error FROM media WHERE webpath = ?", (self.media_url,)).fetchone()[0]
- conn.close()
- return error
+ log.error("%s failed to download: No path or error found in the database.", self.media_url)
+ self.message = f"{self.media_url_link} failed to download: No path or error found in the database."
+
+ media_entry = self.session.query(Media.id).filter(Media.webpath == self.media_url).first()
+ if media_entry:
+ self.media_id = media_entry.id
+ # Delete media and captions entries
+ self.db_service.delete_media_and_captions(self.media_id, self.media_url)
+ else:
+ self.media_id = None
@property
def name(self):