From 0c4231205d6cd7dc5e8c43a01f99197449f0f30a Mon Sep 17 00:00:00 2001 From: xiaojin <390540332@163.com> Date: Mon, 6 Jan 2025 21:40:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20web=20=E9=A1=B5=E9=9D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 70 ++-- alist-sync-web.py | 949 ++++++++++++++++++++++------------------------ requirements.txt | 25 +- 3 files changed, 499 insertions(+), 545 deletions(-) diff --git a/Dockerfile b/Dockerfile index 278cd97..8b86e33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,48 +1,56 @@ -# 使用 Python 精简版作为基础镜像 -FROM python:3.10-slim AS builder +# 使用 Python Alpine 作为构建镜像 +FROM python:3.10-alpine AS builder # 设置工作目录 WORKDIR /app +# 安装构建依赖 +RUN apk add --no-cache \ + gcc \ + musl-dev \ + python3-dev \ + libffi-dev \ + openssl-dev + # 复制依赖文件 COPY requirements.txt . -# 安装构建依赖和项目依赖 -RUN apt-get update \ - && apt-get install -y --no-install-recommends \ - gcc \ - python3-dev \ - build-essential \ - libssl-dev \ - libffi-dev \ - && pip install --upgrade pip \ - && pip install --no-cache-dir wheel setuptools \ - && pip install --no-cache-dir -r requirements.txt \ - && apt-get purge -y --auto-remove \ - gcc \ - python3-dev \ - build-essential \ - libssl-dev \ - libffi-dev \ - && rm -rf /var/lib/apt/lists/* - -# 复制项目文件 -COPY . . - -# 使用精简版运行时镜像 -FROM python:3.10-slim +# 安装依赖到指定目录 +RUN pip install --no-cache-dir -r requirements.txt --target=/install + +# 使用更小的运行时镜像 +FROM python:3.10-alpine +# 设置工作目录 WORKDIR /app -# 复制已安装的依赖和项目文件 -COPY --from=builder /usr/local/lib/python3.10/site-packages/ /usr/local/lib/python3.10/site-packages/ -COPY --from=builder /app /app +# 复制安装的依赖 +COPY --from=builder /install /usr/local/lib/python3.10/site-packages + +# 只复制必要的项目文件 +COPY alist-sync-web.py alist_sync.py ./ +COPY static ./static +COPY templates ./templates + +# 创建必要的目录 +RUN mkdir -p /app/data/config /app/data/log && \ + # 设置权限 + chmod -R 755 /app/data && \ + # 清理不必要的文件 + find /usr/local/lib/python3.10/site-packages -name "*.pyc" -delete && \ + find /usr/local/lib/python3.10/site-packages -name "__pycache__" -exec rm -r {} + && \ + # 删除测试文件和文档 + find /usr/local/lib/python3.10/site-packages -name "tests" -type d -exec rm -r {} + && \ + find /usr/local/lib/python3.10/site-packages -name "*.txt" -delete && \ + find /usr/local/lib/python3.10/site-packages -name "*.md" -delete # 设置环境变量 -ENV PYTHONUNBUFFERED=1 +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PATH="/app:$PATH" # 暴露端口 EXPOSE 52441 -# 修改启动命令,使用正确的文件名 +# 设置启动命令 CMD ["python", "alist-sync-web.py"] \ No newline at end of file diff --git a/alist-sync-web.py b/alist-sync-web.py index ea06f85..cf0e5f9 100644 --- a/alist-sync-web.py +++ b/alist-sync-web.py @@ -1,24 +1,30 @@ from flask import Flask, render_template, request, jsonify, session, redirect, url_for -from flask_bootstrap import Bootstrap import logging import os import json - -import croniter, datetime, time +import hashlib +import croniter +import datetime +import time from functools import wraps - -# 动态导入alist-sync-ql.py import importlib.util import sys - from typing import Dict, List, Optional, Any - from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from logging.handlers import TimedRotatingFileHandler -import glob -from passlib.hash import sha256_crypt + +# 替换 passlib 的密码哈希功能 +def hash_password(password: str) -> str: + """使用 SHA-256 哈希密码""" + return hashlib.sha256(password.encode()).hexdigest() + + +def verify_password(password: str, hash: str) -> bool: + """验证密码哈希""" + return hash_password(password) == hash + # 创建一个全局的调度器 scheduler = BackgroundScheduler() @@ -38,7 +44,7 @@ def import_from_file(module_name: str, file_path: str) -> Any: try: current_dir = os.path.dirname(os.path.abspath(__file__)) alist_sync = import_from_file('alist_sync', - os.path.join(current_dir, 'alist_sync.py')) + os.path.join(current_dir, 'alist_sync.py')) AlistSync = alist_sync.AlistSync except Exception as e: print(f"导入alist_sync.py失败: {e}") @@ -48,7 +54,6 @@ def import_from_file(module_name: str, file_path: str) -> Any: app = Flask(__name__) app.secret_key = os.urandom(24) # 用于session加密 -Bootstrap(app) # 设置日志记录器 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -71,7 +76,7 @@ def import_from_file(module_name: str, file_path: str) -> Any: "users": [ { "username": "admin", - "password": sha256_crypt.hash("admin") + "password": hash_password("admin") # 使用新的哈希函数 } ] } @@ -125,7 +130,112 @@ def login(): return render_template('login.html') -# 登录接口 +# 优化日志配置 +def setup_logger(): + """配置日志记录器""" + log_dir = os.path.join(app.root_path, 'data/log') + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, 'alist_sync.log') + + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + + # 文件处理器 + file_handler = TimedRotatingFileHandler( + filename=log_file, + when='midnight', + interval=1, + backupCount=7, + encoding='utf-8' + ) + file_handler.setFormatter(formatter) + + # 控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + # 配置根日志记录器 + logger = logging.getLogger() + logger.setLevel(logging.INFO) + logger.handlers.clear() + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + + +# 优化用户认证相关代码 +class UserManager: + def __init__(self, config_file: str): + self.config_file = config_file + self._ensure_config_exists() + + def _ensure_config_exists(self): + """确保用户配置文件存在""" + if not os.path.exists(self.config_file): + default_config = { + "users": [{ + "username": "admin", + "password": hash_password("admin") + }] + } + self.save_config(default_config) + + def load_config(self) -> Dict: + """加载用户配置""" + try: + with open(self.config_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"加载用户配置失败: {e}") + return {"users": []} + + def save_config(self, config: Dict) -> bool: + """保存用户配置""" + try: + with open(self.config_file, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + return True + except Exception as e: + logger.error(f"保存用户配置失败: {e}") + return False + + def verify_user(self, username: str, password: str) -> bool: + """验证用户凭据""" + config = self.load_config() + user = next((u for u in config['users'] if u['username'] == username), None) + return user and verify_password(password, user['password']) + + def change_user_password(self, username: str, new_username: str, + old_password: str, new_password: str) -> tuple[bool, str]: + """修改用户密码""" + config = self.load_config() + user = next((u for u in config['users'] if u['username'] == username), None) + + if not user: + return False, "用户不存在" + + if not verify_password(old_password, user['password']): + return False, "原密码错误" + + if username != new_username: + exists_user = next((u for u in config['users'] + if u['username'] == new_username and u != user), None) + if exists_user: + return False, "新用户名已存在" + + user['username'] = new_username + user['password'] = hash_password(new_password) + + if self.save_config(config): + return True, "修改成功" + return False, "保存配置失败" + + +# 创建用户管理器实例 +user_manager = UserManager(USER_CONFIG_FILE) + + +# 优化登录接口 @app.route('/api/login', methods=['POST']) def api_login(): try: @@ -136,21 +246,13 @@ def api_login(): if not username or not password: return jsonify({'code': 400, 'message': '用户名和密码不能为空'}) - # 加载用户配置 - config = load_users() - - # 查找用户并验证密码 - user = next((user for user in config['users'] - if user['username'] == username), None) - - if user and sha256_crypt.verify(password, user['password']): + if user_manager.verify_user(username, password): session['user_id'] = username return jsonify({'code': 200, 'message': '登录成功'}) - else: - return jsonify({'code': 401, 'message': '用户名或密码错误'}) + return jsonify({'code': 401, 'message': '用户名或密码错误'}) except Exception as e: - print(f"登录失败: {e}") + logger.error(f"登录失败: {e}") return jsonify({'code': 500, 'message': '服务器错误'}) @@ -180,58 +282,30 @@ def current_user(): return jsonify({'code': 500, 'message': '服务器错误'}) -# 修改密码接口 +# 优化修改密码接口 @app.route('/api/change-password', methods=['POST']) @login_required def change_password(): try: data = request.get_json() - old_username = data.get('oldUsername') - new_username = data.get('newUsername') - old_password = data.get('oldPassword') - new_password = data.get('newPassword') - - if not all([old_username, new_username, old_password, new_password]): + if not all(data.get(k) for k in ['oldUsername', 'newUsername', 'oldPassword', 'newPassword']): return jsonify({'code': 400, 'message': '所有字段都不能为空'}) - # 加载用户配置 - config = load_users() - - # 查找当前用户 - username = session['user_id'] - user = next((user for user in config['users'] - if user['username'] == username), None) - - if not user: - return jsonify({'code': 404, 'message': '用户不存在'}) - - # 验证原密码 - if not sha256_crypt.verify(old_password, user['password']): - return jsonify({'code': 400, 'message': '原密码错误'}) - - # 如果修改了用户名,确保新用户名不存在 - if old_username != new_username: - exists_user = next((u for u in config['users'] - if u['username'] == new_username - and u != user), None) - if exists_user: - return jsonify({'code': 400, 'message': '新用户名已存在'}) + success, message = user_manager.change_user_password( + data['oldUsername'], + data['newUsername'], + data['oldPassword'], + data['newPassword'] + ) - # 更新用户名和密码 - user['username'] = new_username - user['password'] = sha256_crypt.hash(new_password) - - # 保存配置 - if save_users(config): - # 如果修改了用户名,更新session - if old_username != new_username: - session['user_id'] = new_username - return jsonify({'code': 200, 'message': '修改成功'}) - else: - return jsonify({'code': 500, 'message': '保存配置失败'}) + if success: + if data['oldUsername'] != data['newUsername']: + session['user_id'] = data['newUsername'] + return jsonify({'code': 200, 'message': message}) + return jsonify({'code': 400, 'message': message}) except Exception as e: - print(f"修改密码失败: {e}") + logger.error(f"修改密码失败: {e}") return jsonify({'code': 500, 'message': '服务器错误'}) @@ -247,49 +321,28 @@ def logout(): @login_required def save_base_config(): data = request.get_json() - base_url = data.get('baseUrl') - username = data.get('username') - password = data.get('password') - config_file_path = os.path.join(STORAGE_DIR, 'base_config.json') - try: - with open(config_file_path, 'w') as f: - json.dump({ - "baseUrl": base_url, - "username": username, - "password": password - }, f) + if config_manager.save('base_config', data): return jsonify({"code": 200, "message": "基础配置保存成功"}) - except Exception as e: - return jsonify({"code": 500, "message": f"保存失败: {str(e)}"}) + return jsonify({"code": 500, "message": "保存失败"}) # 查询基础连接配置接口 @app.route('/api/get-base-config', methods=['GET']) @login_required def get_base_config(): - config_file_path = os.path.join(STORAGE_DIR, 'base_config.json') - try: - with open(config_file_path, 'r') as f: - config_data = json.load(f) - return jsonify({"code": 200, "data": config_data}) - except FileNotFoundError: - return jsonify({"code": 404, "message": "配置文件不存在"}) - except Exception as e: - return jsonify({"code": 500, "message": f"读取配置失败: {str(e)}"}) + config = config_manager.load('base_config') + if config: + return jsonify({"code": 200, "data": config}) + return jsonify({"code": 404, "message": "配置文件不存在"}) @app.route('/api/get-sync-config', methods=['GET']) @login_required def get_sync_config(): - config_file_path = os.path.join(STORAGE_DIR, 'sync_config.json') - try: - with open(config_file_path, 'r') as f: - config_data = json.load(f) - return jsonify({"code": 200, "data": config_data}) - except FileNotFoundError: - return jsonify({"code": 404, "message": "配置文件不存在"}) - except Exception as e: - return jsonify({"code": 500, "message": f"读取配置失败: {str(e)}"}) + config = config_manager.load('sync_config') + if config: + return jsonify({"code": 200, "data": config}) + return jsonify({"code": 404, "message": "配置文件不存在"}) # 定义超时处理函数 @@ -302,22 +355,19 @@ def timeout_handler(signum, frame): @login_required def test_connection(): try: - data = request.get_json() - base_url = data.get('baseUrl') - username = data.get('username') - password = data.get('password') - - # 创建 AlistSync 实例 - alist = AlistSync(base_url, username, password) - - # 尝试登录 - if alist.login(): - return jsonify({"code": 200, "message": "连接测试成功"}) - else: - return jsonify({"code": 500, "message": "地址或用户名或密码错误"}) + alist = AlistSync( + data.get('baseUrl'), + data.get('username'), + data.get('password') + ) + return jsonify({ + "code": 200 if alist.login() else 500, + "message": "连接测试成功" if alist.login() else "地址或用户名或密码错误" + }) except Exception as e: + logger.error(f"连接测试失败: {str(e)}") return jsonify({"code": 500, "message": f"连接测试失败: {str(e)}"}) finally: if 'alist' in locals(): @@ -326,446 +376,354 @@ def test_connection(): # 添加以下函数来管理定时任务 def schedule_sync_tasks(): - """ - 从配置文件读取并调度所有同步任务 - """ - try: - # 清除所有现有的任务 - scheduler.remove_all_jobs() - - # 加载同步配置 - sync_config = load_sync_config() - if not sync_config or 'tasks' not in sync_config: - logger.warning("没有找到有效的同步任务配置") - return - - # 为每个任务创建调度 - for task in sync_config['tasks']: - if 'cron' not in task: - logger.warning(f"任务 {task.get('taskName', 'unknown')} 没有配置cron表达式") - continue - - try: - job_id = f"sync_task_{task['id']}" - # 修改这里,直接传递函数而不是调用结果 - scheduler.add_job( - func=execute_sync_task, # 不要加括号调用 - trigger=CronTrigger.from_crontab(task['cron']), - id=job_id, - replace_existing=True, - args=[task['id']] # 通过 args 传递参数 - ) - logger.info(f"成功调度任务 {task['taskName']}, ID: {job_id}, Cron: {task['cron']}") - except Exception as e: - logger.error(f"调度任务 {task.get('taskName', 'unknown')} 失败: {str(e)}") - - except Exception as e: - logger.error(f"调度同步任务时发生错误: {str(e)}") + """从配置文件读取并调度所有同步任务""" + scheduler_manager.reload_tasks() + + +# 优化配置管理 +class ConfigManager: + def __init__(self, storage_dir: str): + self.storage_dir = storage_dir + os.makedirs(storage_dir, exist_ok=True) + + def load(self, config_name: str) -> Optional[Dict]: + """加载配置文件""" + config_file = os.path.join(self.storage_dir, f'{config_name}.json') + try: + with open(config_file, 'r', encoding='utf-8') as f: + return json.load(f) + except FileNotFoundError: + logger.warning(f"配置文件不存在: {config_file}") + return None + except Exception as e: + logger.error(f"读取配置失败: {str(e)}") + return None + + def save(self, config_name: str, data: Dict) -> bool: + """保存配置文件""" + config_file = os.path.join(self.storage_dir, f'{config_name}.json') + try: + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + return True + except Exception as e: + logger.error(f"保存配置失败: {str(e)}") + return False -# 修改保存同步配置接口,使其在保存后重新调度任务 -@app.route('/api/save-sync-config', methods=['POST']) -@login_required -def save_sync_config(): - data = request.get_json() - sync_config_file_path = os.path.join(STORAGE_DIR, 'sync_config.json') - try: - with open(sync_config_file_path, 'w') as f: - json.dump(data, f) - # 重新调度所有任务 - schedule_sync_tasks() - return jsonify({"code": 200, "message": "同步配置保存成功并已更新调度"}) - except Exception as e: - return jsonify({"code": 500, "message": f"保存失败: {str(e)}"}) +# 优化任务执行管理 +class TaskManager: + def __init__(self, config_manager: ConfigManager): + self.config_manager = config_manager + def execute_task(self, task_id: Optional[int] = None) -> bool: + """执行同步任务""" + try: + logger.info("开始执行同步任务") -# 假设存储器列表数据也是存储在文件中,这里模拟返回一些示例数据,你可根据实际替换读取逻辑 -@app.route('/api/storages', methods=['GET']) -@login_required -def get_storages(): - try: - config = get_base_config() - data = config.get_json().get("data") + # 加载配置 + sync_config = self.config_manager.load('sync_config') + base_config = self.config_manager.load('base_config') - # data = request.get_json() - base_url = data.get('baseUrl') - username = data.get('username') - password = data.get('password') - # 创建 AlistSync 实例 - alist = AlistSync(base_url, username, password) + if not sync_config or not base_config: + logger.error("配置为空,无法执行同步任务") + return False - # 登录并获取存储列表 - if alist.login(): - storage_list = alist.get_storage_list() - return jsonify({"code": 200, "data": storage_list}) - else: - return jsonify({"code": 500, "message": "获取存储列表失败:登录失败"}) - except Exception as e: - return jsonify({"code": 500, "message": f"获取存储列表失败: {str(e)}"}) - finally: - if 'alist' in locals(): - alist.close() + # 设置基础环境变量 + self._setup_env_vars(base_config) + # 处理任务 + tasks = sync_config.get('tasks', []) + if not tasks: + logger.error("没有配置同步任务") + return False -@app.route('/api/next-run-time', methods=['POST']) -def next_run_time(): - # Cron 表达式解析与时间计算 - try: - data = request.get_json() - cron_expression = data.get('cron') - if not cron_expression: - return jsonify({"code": 400, "message": "缺少cron参数"}), 400 - next_time_list = crontab_run_next_time(cron_expression) - return jsonify({"code": 200, "data": next_time_list}) - except Exception as e: - return jsonify({"code": 500, "message": f"解析出错: {str(e)}"}), 500 + for task in tasks: + if task_id is not None and task_id != task['id']: + continue + self._execute_single_task(task) -def datetime_to_timestamp(timestring, format="%Y-%m-%d %H:%M:%S"): - """ 将普通时间格式转换为时间戳(10位), 形如 '2016-05-05 20:28:54',由format指定 """ - try: - # 转换成时间数组 - timeArray = time.strptime(timestring, format) - except Exception: - raise - else: - # 转换成10位时间戳 - return int(time.mktime(timeArray)) - - -def get_current_timestamp(): - """ 获取本地当前时间戳(10位): Unix timestamp:是从1970年1月1日(UTC/GMT的午夜)开始所经过的秒数,不考虑闰秒 """ - return int(time.mktime(datetime.datetime.now().timetuple())) - - -def timestamp_after_timestamp(timestamp=None, seconds=0, minutes=0, hours=0, days=0): - """ 给定时间戳(10位),计算该时间戳之后多少秒、分钟、小时、天的时间戳(本地时间) """ - # 1. 默认时间戳为当前时间 - timestamp = get_current_timestamp() if timestamp is None else timestamp - # 2. 先转换为datetime - d1 = datetime.datetime.fromtimestamp(timestamp) - # 3. 根据相关时间得到datetime对象并相加给定时间戳的时间 - d2 = d1 + datetime.timedelta(seconds=int(seconds), minutes=int(minutes), hours=int(hours), days=int(days)) - # 4. 返回某时间后的时间戳 - return int(time.mktime(d2.timetuple())) - - -def timestamp_datetime(timestamp, format='%Y-%m-%d %H:%M:%S'): - """ 将时间戳(10位)转换为可读性的时间 """ - # timestamp为传入的值为时间戳(10位整数),如:1332888820 - timestamp = time.localtime(timestamp) - return time.strftime(format, timestamp) - - -def crontab_run_next_time(cron_expression, timeFormat="%Y-%m-%d %H:%M:%S", queryTimes=5): - """计算定时任务下次运行时间 - sched str: 定时任务时间表达式 - timeFormat str: 格式为"%Y-%m-%d %H:%M" - queryTimes int: 查询下次运行次数 - """ - try: - now = datetime.datetime.now() - except ValueError: - raise - else: - # 以当前时间为基准开始计算 - cron = croniter.croniter(cron_expression, now) - return [cron.get_next(datetime.datetime).strftime(timeFormat) for i in range(queryTimes)] - - -# def CrontabRunTime(sched, ctime, timeFormat="%Y-%m-%d %H:%M:%S"): -# """计算定时任务运行次数 -# sched str: 定时任务时间表达式 -# ctime str: 定时任务创建的时间,与timeFormat格式对应 -# timeFormat str: 格式为"%Y-%m-%d %H:%M" -# """ -# try: -# ctimeStrp = datetime.datetime.strptime(ctime, timeFormat) -# except ValueError: -# raise -# else: -# # 根据定时任务创建时间开始计算 -# cron = croniter.croniter(sched, ctimeStrp) -# now = get_current_timestamp() -# num = 0 -# while 1: -# timestring = cron.get_next(datetime.datetime).strftime(timeFormat) -# timestamp = datetime_to_timestamp(timestring, "%Y-%m-%d %H:%M:%S") -# if timestamp > now: -# break -# else: -# num += 1 -# return num - - -# 执行任务接口 -@app.route('/api/run-task', methods=['POST']) -@login_required -def run_task(): - """立即执行同步任务""" - try: - if execute_sync_task(): - return jsonify({"code": 200, "message": "同步任务执行成功"}) - else: - return jsonify({"code": 500, "message": "同步任务执行失败"}) - except Exception as e: - return jsonify({"code": 500, "message": f"执行任务时发生错误: {str(e)}"}) + return True + except Exception as e: + logger.error(f"执行同步任务失败: {str(e)}") + return False -def execute_sync_task(id: int | None = None): - """执行同步任务""" - try: - logger.info("开始执行同步任务") + def _setup_env_vars(self, base_config: Dict): + """设置环境变量""" + # 清除旧的环境变量 + for key in list(os.environ.keys()): + if key.startswith('DIR_PAIRS'): + del os.environ[key] + + # 设置新的环境变量 + os.environ.update({ + 'BASE_URL': base_config.get('baseUrl', ''), + 'USERNAME': base_config.get('username', ''), + 'PASSWORD': base_config.get('password', '') + }) - # 加载同步配置获取任务名称和差异处置策略 - sync_config = load_sync_config() - task_name = "未知任务" - sync_del_action = "none" # 默认值 + def _execute_single_task(self, task: Dict): + """执行单个任务""" + task_name = task.get('taskName', '未知任务') + sync_del_action = task.get('syncDelAction', 'none') + logger.info(f"[{task_name}] 开始处理任务,差异处置策略: {sync_del_action}") - if id is not None and sync_config and 'tasks' in sync_config: - task = next((t for t in sync_config['tasks'] if t['id'] == id), None) - if task: - task_name = task.get('taskName', '未知任务') - sync_del_action = task.get('syncDelAction', 'none') + os.environ['SYNC_DELETE_ACTION'] = sync_del_action + os.environ['EXCLUDE_DIRS'] = task.get('excludeDirs', '') - logger.info(f"任务名称: {task_name}, 差异处置策略: {sync_del_action}") + if task['syncMode'] == 'data': + self._handle_data_sync(task) + elif task['syncMode'] == 'file': + self._handle_file_sync(task) - # 加载基础配置 - base_config = load_base_config() - if not base_config: - logger.error("基础配置为空,无法执行同步任务") - return False + def _handle_data_sync(self, task: Dict): + """处理数据同步模式""" + source = task['sourceStorage'] + sync_dirs = task['syncDirs'] + exclude_dirs = task['excludeDirs'] - # logger.info(f"已加载基础配置: {base_config}") - logger.info(f"已加载基础配置") - - # 清除可能存在的旧环境变量 - for i in range(1, 51): - if f'DIR_PAIRS{i}' in os.environ: - del os.environ[f'DIR_PAIRS{i}'] - if 'DIR_PAIRS' in os.environ: - del os.environ['DIR_PAIRS'] - - # 设置基础环境变量 - os.environ['BASE_URL'] = base_config.get('baseUrl', '') - os.environ['USERNAME'] = base_config.get('username', '') - os.environ['PASSWORD'] = base_config.get('password', '') - - # 加载同步配置 - sync_config = load_sync_config() - if not sync_config: - logger.error("同步配置为空,无法执行同步任务") - return False + if source not in exclude_dirs: + exclude_dirs = f'{source}/{exclude_dirs}' + exclude_dirs = exclude_dirs.replace('//', '/') - # 处理任务列表 - tasks = sync_config.get('tasks', []) - if not tasks: - logger.error("没有配置同步任务") - return False + dir_pairs = [] + for target in task['targetStorages']: + if source != target: + dir_pair = f"{source}/{sync_dirs}:{target}/{sync_dirs}".replace('//', '/') + dir_pairs.append(dir_pair) - for task in tasks: - try: - if id is None or id == task['id']: - task_name = task.get('taskName', '未知任务') - sync_del_action = task.get('syncDelAction', 'none') - logger.info(f"[{task_name}] 开始处理任务,差异处置策略: {sync_del_action}") - - # 更新环境变量中的差异处置策略 - os.environ['SYNC_DELETE_ACTION'] = sync_del_action - - if task['syncMode'] == 'data': - dir_pairs = '' - exclude_dirs = task['excludeDirs'] - os.environ['EXCLUDE_DIRS'] = exclude_dirs - # 数据同步模式:一个源存储同步到多个目标存储 - syncDirs = task['syncDirs'] - source = task['sourceStorage'] - - if source not in exclude_dirs: - exclude_dirs = f'{source}/{exclude_dirs}' - exclude_dirs = exclude_dirs.replace('//', '/') - - for target in task['targetStorages']: - if source != target: - dir_pair = f"{source}/{syncDirs}:{target}/{syncDirs}" - dir_pair = dir_pair.replace('//', '/') - if f'DIR_PAIRS' in os.environ: - os.environ['DIR_PAIRS'] += f";{dir_pair}" - else: - os.environ['DIR_PAIRS'] = dir_pair - - if dir_pairs != '': - dir_pairs += f";{dir_pair}" - else: - dir_pairs = dir_pair - logger.info(f"[{task_name}] 添加同步目录对: {dir_pair}") - # 调用 alist_sync 的 main 函数 - alist_sync.main(dir_pairs, sync_del_action, exclude_dirs) - elif task['syncMode'] == 'file': - dir_pairs = '' - exclude_dirs = task['excludeDirs'] - os.environ['EXCLUDE_DIRS'] = exclude_dirs - # 文件同步模式:多个源路径同步到对应的目标路径 - paths = task['paths'] - for path in paths: - dir_pair = f"{path['srcPath']}:{path['dstPath']}" - if 'DIR_PAIRS' in os.environ: - os.environ['DIR_PAIRS'] += f";{dir_pair}" - else: - os.environ['DIR_PAIRS'] = dir_pair - - if dir_pairs != '': - dir_pairs += f";{dir_pair}" - else: - dir_pairs = dir_pair - - logger.info(f"[{task_name}] 添加同步目录对: {dir_pair}") - alist_sync.main(dir_pairs, sync_del_action, exclude_dirs) - - except KeyError as e: - logger.error(f"[{task_name}] 任务配置错误: {e}") - continue - - # 检查是否有有效的同步目录对 - if 'DIR_PAIRS' not in os.environ or not os.environ['DIR_PAIRS']: - logger.error("没有有效的同步目录对") - return False + if dir_pairs: + os.environ['DIR_PAIRS'] = ';'.join(dir_pairs) + alist_sync.main() - logger.info(f"[{task_name}] 开始执行同步任务,同步目录对: {os.environ['DIR_PAIRS']}") + def _handle_file_sync(self, task: Dict): + """处理文件同步模式""" + dir_pairs = [f"{path['srcPath']}:{path['dstPath']}" for path in task['paths']] + if dir_pairs: + os.environ['DIR_PAIRS'] = ';'.join(dir_pairs) + alist_sync.main() - # 调用 alist_sync 的 main 函数 - alist_sync.main() - logger.info(f"[{task_name}] 同步任务执行完成") - return True - except Exception as e: - logger.error(f"[{task_name}] 执行同步任务失败: {str(e)}") - return False +# 创建管理器实例 +config_manager = ConfigManager(STORAGE_DIR) +task_manager = TaskManager(config_manager) -def load_base_config() -> dict: - """加载基础配置""" - try: - config_file_path = os.path.join(STORAGE_DIR, 'base_config.json') - if not os.path.exists(config_file_path): - logger.warning(f"基础配置文件不存在: {config_file_path}") - return {} - - with open(config_file_path, 'r', encoding='utf-8') as f: - config = json.load(f) - # logger.info(f"成功加载基础配置: {config}") - logger.info(f"成功加载基础配置") - return config - except Exception as e: - logger.error(f"加载基础配置失败: {e}") - return {} +# 优化配置相关接口 +@app.route('/api/save-sync-config', methods=['POST']) +@login_required +def save_sync_config(): + data = request.get_json() + if config_manager.save('sync_config', data): + schedule_sync_tasks() + return jsonify({"code": 200, "message": "同步配置保存成功并已更新调度"}) + return jsonify({"code": 500, "message": "保存失败"}) -def load_sync_config() -> dict: - """加载同步配置""" +@app.route('/api/run-task', methods=['POST']) +@login_required +def run_task(): try: - sync_config_file_path = os.path.join(STORAGE_DIR, 'sync_config.json') - if not os.path.exists(sync_config_file_path): - logger.warning(f"同步配置文件不存在: {sync_config_file_path}") - return {"tasks": []} - - with open(sync_config_file_path, 'r', encoding='utf-8') as f: - config = json.load(f) - logger.info(f"成功加载同步配置: {config}") - return config + task_id = request.get_json().get('id') + if task_manager.execute_task(task_id): + return jsonify({"code": 200, "message": "同步任务执行成功"}) + return jsonify({"code": 500, "message": "同步任务执行失败"}) except Exception as e: - logger.error(f"加载同步配置失败: {e}") - return {"tasks": []} + logger.error(f"执行任务失败: {str(e)}") + return jsonify({"code": 500, "message": f"执行任务时发生错误: {str(e)}"}) -# 在 if __name__ == '__main__': 之前添加初始化调度的代码 -def init_scheduler(): - """ - 初始化调度器并加载现有任务 - """ +# 修改存储列表获取接口 +@app.route('/api/storages', methods=['GET']) +@login_required +def get_storages(): try: - schedule_sync_tasks() - logger.info("调度器初始化完成") - except Exception as e: - logger.error(f"初始化调度器失败: {str(e)}") + config = config_manager.load('base_config') # 使用 config_manager 替代 load_config + if not config: + return jsonify({"code": 404, "message": "基础配置不存在"}) + alist = AlistSync( + config.get('baseUrl'), + config.get('username'), + config.get('password') + ) -# 修改日志配置部分 -def setup_logger(): - """配置日志记录器""" - # 创建日志目录 - log_dir = os.path.join(app.root_path, 'data/log') - os.makedirs(log_dir, exist_ok=True) - - # 设置日志文件路径 - log_file = os.path.join(log_dir, 'alist_sync.log') + if alist.login(): + storage_list = alist.get_storage_list() + return jsonify({"code": 200, "data": storage_list}) + return jsonify({"code": 500, "message": "获取存储列表失败:登录失败"}) - # 创建 TimedRotatingFileHandler - file_handler = TimedRotatingFileHandler( - filename=log_file, - when='midnight', - interval=1, - backupCount=7, - encoding='utf-8' - ) + except Exception as e: + logger.error(f"获取存储列表失败: {str(e)}") + return jsonify({"code": 500, "message": f"获取存储列表失败: {str(e)}"}) + finally: + if 'alist' in locals(): + alist.close() - # 创建控制台处理器 - console_handler = logging.StreamHandler() - # 设置日志格式 - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) +# 优化时间处理相关代码 +class TimeUtils: + @staticmethod + def get_timestamp() -> int: + """获取当前时间戳""" + return int(time.time()) + + @staticmethod + def datetime_to_timestamp(dt_str: str, fmt: str = "%Y-%m-%d %H:%M:%S") -> int: + """时间字符串转时间戳""" + try: + return int(time.mktime(time.strptime(dt_str, fmt))) + except Exception as e: + logger.error(f"时间转换失败: {e}") + raise + + @staticmethod + def timestamp_to_datetime(ts: int, fmt: str = '%Y-%m-%d %H:%M:%S') -> str: + """时间戳转时间字符串""" + return time.strftime(fmt, time.localtime(ts)) + + @staticmethod + def get_next_run_times(cron_expr: str, count: int = 5) -> List[str]: + """获取下次运行时间列表""" + try: + now = datetime.datetime.now() + cron = croniter.croniter(cron_expr, now) + return [ + cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M:%S") + for _ in range(count) + ] + except Exception as e: + logger.error(f"获取运行时间失败: {e}") + raise + + +# 优化调度器管理 +class SchedulerManager: + def __init__(self, config_manager: ConfigManager, task_manager: TaskManager): + self.scheduler = BackgroundScheduler() + self.config_manager = config_manager + self.task_manager = task_manager + + def start(self): + """启动调度器""" + try: + self.scheduler.start() + self.reload_tasks() + logger.info("调度器启动成功") + except Exception as e: + logger.error(f"调度器启动失败: {e}") + raise + + def stop(self): + """停止调度器""" + try: + self.scheduler.shutdown() + logger.info("调度器已停止") + except Exception as e: + logger.error(f"停止调度器失败: {e}") + + def reload_tasks(self): + """重新加载所有任务""" + try: + self.scheduler.remove_all_jobs() + sync_config = self.config_manager.load('sync_config') + + if not sync_config or 'tasks' not in sync_config: + logger.warning("没有找到有效的同步任务配置") + return + + for task in sync_config['tasks']: + self._add_task(task) + + except Exception as e: + logger.error(f"重新加载任务失败: {e}") + + def _add_task(self, task: Dict): + """添加单个任务""" + try: + if 'cron' not in task: + logger.warning(f"任务 {task.get('taskName', 'unknown')} 没有配置cron表达式") + return - # 配置根日志记录器 - logger = logging.getLogger() - logger.setLevel(logging.INFO) + job_id = f"sync_task_{task['id']}" + self.scheduler.add_job( + func=self.task_manager.execute_task, + trigger=CronTrigger.from_crontab(task['cron']), + id=job_id, + replace_existing=True, + args=[task['id']] + ) + logger.info(f"成功添加任务 {task['taskName']}, ID: {job_id}, Cron: {task['cron']}") - # 清除现有的处理器 - logger.handlers.clear() + except Exception as e: + logger.error(f"添加任务失败: {e}") - # 添加处理器 - logger.addHandler(file_handler) - logger.addHandler(console_handler) - return logger +# 创建调度器管理器实例 +scheduler_manager = SchedulerManager(config_manager, task_manager) -# 在 app 创建后调用 -logger = setup_logger() +# 优化相关接口 +@app.route('/api/next-run-time', methods=['POST']) +@login_required +def next_run_time(): + try: + data = request.get_json() + cron_expr = data.get('cron', '').strip() + + # 如果没有提供cron表达式,尝试从配置中获取 + if not cron_expr: + task_id = data.get('id') + if task_id is not None: + sync_config = config_manager.load('sync_config') + if sync_config and 'tasks' in sync_config: + task = next((t for t in sync_config['tasks'] if t['id'] == task_id), None) + if task and 'cron' in task: + cron_expr = task['cron'] + + if not cron_expr: + return jsonify({"code": 400, "message": "缺少cron参数"}) + + next_times = TimeUtils.get_next_run_times(cron_expr) + return jsonify({ + "code": 200, + "data": next_times, + "cron": cron_expr # 返回使用的cron表达式 + }) + except Exception as e: + logger.error(f"解析cron表达式失败: {e}") + return jsonify({"code": 500, "message": f"解析出错: {str(e)}"}) -# 添加获取日志的接口 +# 将日志接口移到主函数之前 @app.route('/api/logs', methods=['GET']) @login_required def get_logs(): try: - # 获取请求参数中的日期 date_str = request.args.get('date') + log_dir = os.path.join(app.root_path, 'data/log') - # 构建日志文件路径 - log_dir = os.path.join(app.root_path,'data/log') - - # 如果是请求当前日志或没有指定日期 if not date_str or date_str == 'current': log_file = os.path.join(log_dir, 'alist_sync.log') date_str = 'current' else: - # 历史日志文件 log_file = os.path.join(log_dir, f'alist_sync.log.{date_str}') - logs = [] if os.path.exists(log_file): with open(log_file, 'r', encoding='utf-8') as f: content = f.read() - logs.append({ - 'date': date_str, - 'content': content + return jsonify({ + 'code': 200, + 'data': [{ + 'date': date_str, + 'content': content + }] }) - return jsonify({ - 'code': 200, - 'data': logs + 'code': 404, + 'message': '日志文件不存在' }) except Exception as e: @@ -776,6 +734,15 @@ def get_logs(): }) +# 主函数 if __name__ == '__main__': - init_scheduler() - app.run(host='0.0.0.0', port=52441, debug=False) + try: + # 启动调度器 + scheduler_manager.start() + # 启动Web服务 + app.run(host='0.0.0.0', port=52441, debug=False) + except Exception as e: + logger.error(f"启动失败: {e}") + finally: + # 确保调度器正确关闭 + scheduler_manager.stop() diff --git a/requirements.txt b/requirements.txt index 4de78d9..287cc02 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,29 +1,8 @@ -# Web 框架 +# 核心依赖 Flask==3.0.0 -Flask-Bootstrap==3.3.7.1 - -# 定时任务 APScheduler==3.10.4 - -# HTTP 客户端 -requests==2.31.0 - -# 日期时间处理 -python-dateutil==2.8.2 - -# 工具库 -PyYAML==6.0.1 croniter==2.0.1 -# 加密 (使用 passlib 替代 bcrypt) -passlib==1.7.4 - -# UI 框架依赖 +# Flask 必需的依赖 Werkzeug==3.0.1 Jinja2==3.1.2 - -# 日志处理 -colorlog==6.7.0 - -# 文件系统操作 -pathlib==1.0.1 \ No newline at end of file