diff --git a/.github/workflows/update-mirror.yml b/.github/workflows/update-mirror.yml index 536c8a7..9734eca 100644 --- a/.github/workflows/update-mirror.yml +++ b/.github/workflows/update-mirror.yml @@ -33,7 +33,6 @@ jobs: run: | sudo -E apt-get -qq update sudo -E apt-get -qq install build-essential libncurses5-dev binutils bzip2 coreutils gawk gettext git patch zlib1g-dev subversion git-core gcc g++ p7zip-full libssl-dev autoconf automake libtool autopoint curl wget vim nano python3 python3-pip xz-utils tar unzip rsync - sudo -E -H pip3 install -r aliyundrive-uploader/requirements.txt sudo -E apt-get -qq clean - name: Checkout ImmortalWrt Source Tree on Branch ${{ matrix.branch }} @@ -104,24 +103,6 @@ jobs: [ -d "uploader-keys" ] || mkdir -p "uploader-keys" cp -f "$UPLOADER_CONF" "uploader-keys/$UPLOADER_TYPE.json" - - name: Upload Files to AliyunDrive - env: - UPLOADER_TYPE: "alidrive" - ALI_DRIVE_ID: ${{ secrets.ALI_DRIVE_ID }} - ALI_REFRESH_TOKEN: ${{ secrets.ALI_REFRESH_TOKEN }} - run: | - ./uploader-cache/detect-upload-files.sh - [ "$(ls -A "package-sources"/* | wc -l)" -gt "0" ] || exit 0 - pushd aliyundrive-uploader - [ -f "../uploader-keys/$UPLOADER_TYPE.json" ] && cp -f "../uploader-keys/$UPLOADER_TYPE.json" "config.json" || ./gen-alidriveconf.sh - [ -f "../uploader-keys/$UPLOADER_TYPE.db" ] && cp -f "../uploader-keys/$UPLOADER_TYPE.db" "db.db" - python3 main.py || echo "Never mind." - popd - ./uploader-cache/gen-downloaded-hash.sh - [ -d "uploader-keys" ] || mkdir -p "uploader-keys" - cp -f "aliyundrive-uploader/config.json" "uploader-keys/$UPLOADER_TYPE.json" - cp -f "aliyundrive-uploader/db.db" "uploader-keys/$UPLOADER_TYPE.db" - cleanup-workflow-runs: runs-on: ubuntu-18.04 needs: update-mirror diff --git a/README.md b/README.md index 4bbc323..bcef21c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ ## Reference [SuLingGG/OpenWrt-Mini/update-immortalwrt-1806-dl-mirror.yml](https://github.com/SuLingGG/OpenWrt-Mini/blob/381524b297bc346c3afbf779eef8601ffe6c469f/.github/workflows/update-immortalwrt-1806-dl-mirror.yml).
-[MoeClub/OneList/OneDriveUploader](https://github.com/MoeClub/OneList/tree/206b44d9a129d383211806f02a600a96893e2445/OneDriveUploader).
-[Hidove/aliyundrive-uploader](https://github.com/Hidove/aliyundrive-uploader/tree/d16b104bc924b390c82789338dd9f60e7ed994ba). +[MoeClub/OneList/OneDriveUploader](https://github.com/MoeClub/OneList/tree/206b44d9a129d383211806f02a600a96893e2445/OneDriveUploader). diff --git a/aliyundrive-uploader/AliyunDrive.py b/aliyundrive-uploader/AliyunDrive.py deleted file mode 100644 index 72d7560..0000000 --- a/aliyundrive-uploader/AliyunDrive.py +++ /dev/null @@ -1,345 +0,0 @@ -# -*- coding: utf-8 -*- -# +------------------------------------------------------------------- -# | 阿里云盘上传类 -# +------------------------------------------------------------------- -# | Author: Pluto -# +------------------------------------------------------------------- - -import json -import math -import os -import requests -from tqdm import tqdm - -from common import DATA, LOCK_TOKEN_REFRESH -import common - -requests.packages.urllib3.disable_warnings() - -""" -status: 0:未上传,1:上传成功,2:正在上传,-1:上传失败 -""" - - -class AliyunDrive: - def __init__(self, drive_id, root_path, chunk_size=10485760): - self.status = 0 - self.create_time = 0 - self.start_time = common.get_timestamp() - self.finish_time = 0 - self.spend_time = 0 - self.drive_id = drive_id - self.root_path = root_path - self.chunk_size = chunk_size - self.filepath = None - self.realpath = None - self.filename = None - self.hash = None - self.part_info_list = [] - self.part_upload_url_list = [] - self.upload_id = 0 - self.file_id = 0 - self.part_number = 0 - self.filesize = 0 - self.headers = { - 'authorization': DATA['access_token'], - 'content-type': 'application/json;charset=UTF-8' - } - self.id = None - - def load_task(self, task): - tmp = [ - "id", - "filepath", - "realpath", - "filesize", - "hash", - "status", - "create_time", - "finish_time", - "spend_time", - "drive_id", - "file_id", - "upload_id", - "part_number", - "chunk_size", - ] - if task['drive_id'] == '' or int(task['drive_id']) == 0: - task['drive_id'] = self.__getattribute__('drive_id') - for v in tmp: - self.__setattr__(v, task[v]) - - def load_file(self, filepath, realpath): - self.filepath = filepath - self.realpath = realpath - self.filename = os.path.basename(self.realpath) - self.print('【{filename}】正在校检文件中,耗时与文件大小有关'.format(filename=self.filename), 'info') - self.hash = common.get_hash(self.realpath) - self.filesize = os.path.getsize(self.realpath) - - self.part_info_list = [] - for i in range(0, math.ceil(self.filesize / self.chunk_size)): - self.part_info_list.append({ - 'part_number': i + 1 - }) - - message = '''================================================= - 文件名:{filename} - hash:{hash} - 文件大小:{filesize} - 文件路径:{filepath} -================================================= -'''.format(filename=self.filename, hash=self.hash, filesize=self.filesize, filepath=self.realpath) - self.print(message, 'info') - - def token_refresh(self): - LOCK_TOKEN_REFRESH.acquire() - try: - data = {"refresh_token": DATA['config']['REFRESH_TOKEN']} - post = requests.post( - 'https://websv.aliyundrive.com/token/refresh', - data=json.dumps(data), - headers={ - 'content-type': 'application/json;charset=UTF-8' - }, - verify=False, - timeout=3 - ) - try: - post_json = post.json() - # 刷新配置中的token - common.set_config('REFRESH_TOKEN', post_json['refresh_token']) - - except Exception as e: - self.print('refresh_token已经失效', 'warn') - raise e - - DATA['access_token'] = post_json['access_token'] - self.headers = { - 'authorization': DATA['access_token'], - 'content-type': 'application/json;charset=UTF-8' - } - DATA['config']['REFRESH_TOKEN'] = post_json['refresh_token'] - finally: - LOCK_TOKEN_REFRESH.release() - return True - - def create(self, parent_file_id): - create_data = { - "drive_id": self.drive_id, - "part_info_list": self.part_info_list, - "parent_file_id": parent_file_id, - "name": self.filename, - "type": "file", - "check_name_mode": "auto_rename", - "size": self.filesize, - "content_hash": self.hash, - "content_hash_name": 'sha1' - } - # 覆盖已有文件 - if DATA['config']['OVERWRITE']: - create_data['check_name_mode'] = 'refuse' - request_post = requests.post( - 'https://api.aliyundrive.com/v2/file/create', - # 'https://api.aliyundrive.com/adrive/v2/file/createWithFolders', - data=json.dumps(create_data), - headers=self.headers, - verify=False - ) - requests_post_json = request_post.json() - if not self.check_auth(requests_post_json): - return self.create(parent_file_id) - # 覆盖已有文件 - if DATA['config']['OVERWRITE'] and requests_post_json.get('exist'): - if self.recycle(requests_post_json.get('file_id')): - self.print('【%s】原有文件回收成功' % self.filename, 'info') - self.print('【%s】重新上传新文件中' % self.filename, 'info') - return self.create(parent_file_id) - - self.part_upload_url_list = requests_post_json.get('part_info_list', []) - self.file_id = requests_post_json.get('file_id') - self.upload_id = requests_post_json.get('upload_id') - common.save_task(self.id, { - 'drive_id': self.drive_id, - 'file_id': self.file_id, - 'upload_id': self.upload_id, - }) - return requests_post_json - - def get_upload_url(self): - self.print('【%s】上传地址已失效正在获取新的上传地址' % self.filename, 'info') - requests_data = { - "drive_id": self.drive_id, - "file_id": self.file_id, - "part_info_list": self.part_info_list, - "upload_id": self.upload_id, - } - requests_post = requests.post( - 'https://api.aliyundrive.com/v2/file/get_upload_url', - data=json.dumps(requests_data), - headers=self.headers, - verify=False - ) - requests_post_json = requests_post.json() - if not self.check_auth(requests_post_json): - return self.get_upload_url() - self.print('【%s】上传地址刷新成功' % self.filename, 'info') - return requests_post_json.get('part_info_list') - - def upload(self): - with open(self.realpath, "rb") as f: - task_log_id = common.log('正在上传【%s】0%%' % self.filename, self.id, 'info') - with tqdm.wrapattr(f, "read", desc='正在上传【%s】' % self.filename, miniters=1, - initial=self.part_number * self.chunk_size, - total=self.filesize, - ascii=True - ) as fs: - - while self.part_number < len(self.part_upload_url_list): - upload_url = self.part_upload_url_list[self.part_number]['upload_url'] - total_size = min(self.chunk_size, self.filesize) - fs.seek(self.part_number * total_size) - res = requests.put( - url=upload_url, - data=common.read_in_chunks(fs, 16 * 1024, total_size), - verify=False, - timeout=None - ) - if 400 <= res.status_code < 600: - common_get_xml_value = common.get_xml_tag_value(res.text, 'Message') - if common_get_xml_value == 'Request has expired.': - self.part_upload_url_list = self.get_upload_url() - continue - common_get_xml_value = common.get_xml_tag_value(res.text, 'Code') - if common_get_xml_value == 'PartNotSequential': - self.part_number -= 1 - continue - elif common_get_xml_value == 'PartAlreadyExist': - pass - else: - self.print(res.text, 'error') - # res.raise_for_status() - return False - self.part_number += 1 - common.update_task_log(task_log_id, - '正在上传【%s】%.2f%%' % ( - self.filename, ((self.part_number * total_size) / self.filesize) * 100)) - udata = { - "part_number": self.part_number, - } - common.save_task(self.id, udata) - - return True - - def complete(self): - complete_data = { - "drive_id": self.drive_id, - "file_id": self.file_id, - "upload_id": self.upload_id - } - complete_post = requests.post( - 'https://api.aliyundrive.com/v2/file/complete', json.dumps(complete_data), - headers=self.headers, - verify=False - ) - - requests_post_json = complete_post.json() - if not self.check_auth(requests_post_json): - return self.complete() - self.finish_time = common.get_timestamp() - self.spend_time = self.finish_time - self.start_time - - if 'file_id' in requests_post_json: - self.print('【{filename}】上传成功!消耗{s}秒'.format(filename=self.filename, s=self.spend_time), 'success') - return True - else: - self.print('【{filename}】上传失败!消耗{s}秒'.format(filename=self.filename, s=self.spend_time), 'warn') - return False - - def create_folder(self, folder_name, parent_folder_id): - create_data = { - "drive_id": self.drive_id, - "parent_file_id": parent_folder_id, - "name": folder_name, - "check_name_mode": "refuse", - "type": "folder" - } - requests_post = requests.post( - 'https://api.aliyundrive.com/adrive/v2/file/createWithFolders', - data=json.dumps(create_data), - headers=self.headers, - verify=False - ) - - requests_post_json = requests_post.json() - if not self.check_auth(requests_post_json): - return self.create_folder(folder_name, parent_folder_id) - return requests_post_json.get('file_id') - - def get_parent_folder_id(self, filepath): - self.print('检索目录中', 'info') - filepath_split = (self.root_path + filepath.lstrip(os.sep)).split(os.sep) - del filepath_split[len(filepath_split) - 1] - path_name = os.sep.join(filepath_split) - if path_name not in DATA['folder_id_dict']: - parent_folder_id = 'root' - parent_folder_name = os.sep - if len(filepath_split) > 0: - for folder in filepath_split: - if folder in ['', 'root']: - continue - parent_folder_id = self.create_folder(folder, parent_folder_id) - parent_folder_name = parent_folder_name.rstrip(os.sep) + os.sep + folder - DATA['folder_id_dict'][parent_folder_name] = parent_folder_id - else: - parent_folder_id = DATA['folder_id_dict'][path_name] - self.print('已存在目录,无需创建', 'info') - - self.print('目录id获取成功{parent_folder_id}'.format(parent_folder_id=parent_folder_id), 'info') - return parent_folder_id - - def recycle(self, file_id): - # https://api.aliyundrive.com/v2/batch - requests_data = { - "requests": [ - { - "body": { - "drive_id": self.drive_id, - "file_id": file_id - }, - "headers": { - "Content-Type": "application/json" - }, - "id": file_id, - "method": "POST", - "url": "/recyclebin/trash" - } - ], - "resource": "file" - } - requests_post = requests.post( - 'https://api.aliyundrive.com/v2/batch', - data=json.dumps(requests_data), - headers=self.headers, - verify=False - ) - requests_post_json = requests_post.json() - if not self.check_auth(requests_post_json): - return self.recycle(file_id) - return True - - def check_auth(self, response_json): - if response_json.get('code') == 'AccessTokenInvalid': - self.print('AccessToken已失效,尝试刷新AccessToken中', 'info') - if self.token_refresh(): - self.print('AccessToken刷新成功,准备返回', 'info') - return False - self.print('无法刷新AccessToken,准备退出', 'error') - if 'code' in response_json.keys(): - self.print(response_json, 'error') - common.suicide() - return True - - def print(self, message, print_type='info'): - func = 'print_' + print_type - return getattr(common, func)(message, self.id) diff --git a/aliyundrive-uploader/Client.py b/aliyundrive-uploader/Client.py deleted file mode 100644 index 956e31e..0000000 --- a/aliyundrive-uploader/Client.py +++ /dev/null @@ -1,235 +0,0 @@ -# -*- coding: utf-8 -*- -# +------------------------------------------------------------------- -# | Client.py -# +------------------------------------------------------------------- -# | Author: Pluto -# +------------------------------------------------------------------- - -# 配置信息 - -import json -import os -import sqlite3 -import sys - -from AliyunDrive import AliyunDrive -from common import get_running_path, get_config, DATA, get_config_file_path, qualify_path, \ - get_all_file_relative, LOCK, get_db_file_path, save_task, get_timestamp, date, suicide -import common - - -class Client(): - tasks = [] - database_file = get_db_file_path() - - def __init__(self): - pass - - def __upload(self, drive): - try: - drive.upload() - except Exception as e: - status = False - for index in range(int(DATA['config']['RETRY'])): - self.print('【%s】正在尝试第%d次重试!' % (drive.filename, index + 1), 'warn', drive.id) - if drive.upload(): - status = True - break - if not status: - drive.status = -1 - return drive - # 提交 - if drive.complete(): - drive.status = 1 - else: - drive.status = -1 - return drive - - def init_config(self): - config = { - "REFRESH_TOKEN": "refresh_token", - "DRIVE_ID": "drive_id", - "ROOT_PATH": "root", - "FILE_PATH": get_running_path(), - "MULTITHREADING": False, - "MAX_WORKERS": 5, - "CHUNK_SIZE": 104857600, - "RESUME": False, - "OVERWRITE": False, - "RETRY": 0, - "RESIDENT": False, - "VERSIONS": "v2021.0729.1800" - } - if not os.path.exists(get_config_file_path()): - self.print('请配置好config.json后重试', 'error') - with open(get_config_file_path(), 'w') as f: - f.write(json.dumps(config)) - suicide() - try: - config.update(get_config()) - DATA['config'] = config - - except Exception as e: - self.print('请配置好config.json后重试', 'error') - raise e - - def init_command_line_parameter(self): - unset_keys = [] - - for k in range(len(sys.argv)): - if sys.argv[k] == '--resident' or sys.argv[k] == '-r': - DATA['config']['RESIDENT'] = True - unset_keys.append(k) - - for v in unset_keys: - del sys.argv[v] - - # 命令分配 - if len(sys.argv) == 3: - # 网盘保存路径 - DATA['config']['ROOT_PATH'] = sys.argv[2] - # 读取文件路径 - abspath = os.path.abspath(sys.argv[1]) - - elif len(sys.argv) == 2: - # 读取文件路径 - abspath = os.path.abspath(sys.argv[1]) - else: - # 读取配置文件里的 - abspath = DATA['config']['FILE_PATH'] - - DATA['config']['FILE_PATH'] = os.path.dirname(abspath) - DATA['config']['ROOT_PATH'] = qualify_path(DATA['config']['ROOT_PATH']) - if not DATA['config']['RESIDENT']: - if os.path.exists(abspath): - if os.path.isdir(abspath): - # 目录上传 - self.tasks = get_all_file_relative(abspath) - self.tasks = list(map(lambda x: os.path.basename(abspath) + os.sep + x, self.tasks)) - else: - # 单文件上传 - self.tasks = [os.path.basename(abspath)] - else: - - self.print('该文件夹不存在:%s,请重试' % abspath, 'error') - # 获取目录的父目录以上传该目录并且格式化目录 - - DATA['config']['FILE_PATH'] = qualify_path(DATA['config']['FILE_PATH']) - - def init_database(self): - conn = sqlite3.connect(self.database_file) - cursor = conn.cursor() - cursor.execute('''create table IF NOT EXISTS task -( - id INTEGER - primary key autoincrement, - filepath TEXT default '' not null, - realpath TEXT default '' not null, - filesize INTEGER, - hash TEXT default '' not null, - status INTEGER default 0 not null, - drive_id TEXT default '' not null, - file_id TEXT default '' not null, - upload_id TEXT default '' not null, - part_number INTEGER default 0 not null, - chunk_size INTEGER default 104857600 not null, - create_time INTEGER default 0 not null, - finish_time INTEGER default 0 not null, - spend_time INTEGER default 0 not null -);''') - cursor.execute('''create table IF NOT EXISTS task_log -( - id INTEGER not null - constraint task_log_pk - primary key autoincrement, - task_id INTEGER, - log_level TEXT default 'info' not null, - content TEXT default '' not null, - create_time INTEGER default 0 not null -);''') - - def upload_file(self, task): - save_task(task['id'], { - 'status': 2 - }) - drive = AliyunDrive(DATA['config']['DRIVE_ID'], DATA['config']['ROOT_PATH'], DATA['config']['CHUNK_SIZE']) - # 加载任务队列 - drive.load_task(task) - # 刷新token - # drive.token_refresh() - if not os.path.exists(task['realpath']): - drive.status = -1 - return drive - drive.load_file(task['filepath'], task['realpath']) - # 创建目录 - LOCK.acquire() - try: - parent_folder_id = drive.get_parent_folder_id(drive.filepath) - finally: - LOCK.release() - # 断点续传 - if DATA['config']['RESUME'] and DATA['config']['DRIVE_ID'] == task['drive_id']: - if 0 not in [ - drive.drive_id, - drive.part_number, - drive.chunk_size, - ] and not drive.file_id and not drive.upload_id: - # 获取上传地址 - drive.part_upload_url_list = drive.get_upload_url() - # 上传 - return self.__upload(drive) - - # 创建上传 - create_post_json = drive.create(parent_folder_id) - if 'rapid_upload' in create_post_json and create_post_json['rapid_upload']: - drive.finish_time = get_timestamp() - drive.spend_time = drive.finish_time - drive.start_time - - self.print('【{filename}】秒传成功!消耗{s}秒'.format(filename=drive.filename, s=drive.spend_time), 'success', - drive.id) - drive.status = 1 - return drive - # 上传 - return self.__upload(drive) - - def save_task(self, task): - task_id = task.id - tmp = [ - "filepath", - "realpath", - "filesize", - "hash", - "status", - "create_time", - "finish_time", - "spend_time", - "drive_id", - "file_id", - "upload_id", - "part_number", - "chunk_size", - ] - - data = {} - for v in tmp: - data[v] = task.__getattribute__(v) - if data[v] is None: - data[v] = '' - - return save_task(task_id, data) - - def print_config_info(self): - s = '' - for k in DATA['config'].keys(): - s += "\n\t\t%s:%s" % (k, DATA['config'][k]) - - content = '''================================================= - 阿里云盘上传工具启动成功 - 当前时间:%s%s -================================================= -''' % (date(get_timestamp()), s) - self.print(content, 'info') - - def print(self, message, print_type, id=0): - func = 'print_' + print_type - return getattr(common, func)(message, id) diff --git a/aliyundrive-uploader/common.py b/aliyundrive-uploader/common.py deleted file mode 100644 index da170a6..0000000 --- a/aliyundrive-uploader/common.py +++ /dev/null @@ -1,252 +0,0 @@ -# -*- coding: utf-8 -*- -# +------------------------------------------------------------------- -# | 公共函数类 -# +------------------------------------------------------------------- -# | Author: Pluto -# +------------------------------------------------------------------- - -import hashlib -import json -import os -import sys -import threading -import time -from xml.dom.minidom import parseString - -from sqlite import sqlite - -LOCK = threading.Lock() -LOCK_TOKEN_REFRESH = threading.Lock() -DATA = { - 'access_token': '', - 'time_period': 600, - 'config': {}, - 'folder_id_dict': {}, - 'task_template': { - "filepath": None, - "filesize": 0, - "hash": '', - "status": 0, - "create_time": time.time(), - "finish_time": 0, - "spend_time": 0, - "drive_id": '0', - "file_id": '0', - "upload_id": '0', - "part_number": 0, - "chunk_size": 104857600, - } -} - - -def suicide(): - os._exit(1) - - -def ctrl_c(signalnum, frame): - suicide() - - -# 处理路径 -def qualify_path(path): - if not path: - return '' - return path.replace('/', os.sep).replace('\\\\', os.sep).rstrip(os.sep) + os.sep - - -# 获取运行目录 -def get_running_path(path=''): - if getattr(sys, 'frozen', False): - return os.path.dirname(sys.executable) + path - elif __file__: - return os.path.dirname(__file__) + path - - -def get_config_file_path(): - return get_running_path('/config.json') - - -def get_db_file_path(): - return get_running_path('/db.db') - - -# 读取配置项 -# @param key 取指定配置项,若不传则取所有配置[可选] -def get_config(key=None): - # 判断是否从文件读取配置 - if not os.path.exists(get_config_file_path()): return None - - with open(get_config_file_path(), 'rb') as f: - f_body = f.read().decode('utf-8') - if not f_body: return None - config = json.loads(f_body) - for value in [ - 'MULTITHREADING', - 'RESUME', - 'OVERWRITE', - 'RESIDENT', - ]: - if value in config: - DATA['config'][value] = bool(config[value]) - config['ROOT_PATH'] = qualify_path(config.get('ROOT_PATH')).rstrip(os.sep) - # 取指定配置项 - if key: - if key in config: return config[key] - return None - return config - - -def set_config(key, value): - config = get_config() - # 是否需要初始化配置项 - if not config: config = {} - # 是否需要设置配置值 - if key: - config[key] = value - with open(get_config_file_path(), 'w') as f: - f.write(json.dumps(config)) - f.flush() - return True - - -def get_db(): - return sqlite().dbfile(get_db_file_path()) - - -def get_hash(filepath, block_size=2 * 1024 * 1024): - with open(filepath, 'rb') as f: - sha1 = hashlib.sha1() - while True: - data = f.read(block_size) - if not data: - break - sha1.update(data) - return sha1.hexdigest() - - -def get_all_file(path): - result = [] - get_dir = os.listdir(path) - for i in get_dir: - sub_dir = os.path.join(path, i) - if os.path.isdir(sub_dir): - result.extend(get_all_file(sub_dir)) - else: - result.append(sub_dir) - return result - - -def get_all_file_relative(path): - result = [] - if not os.path.exists(path): - return result - get_dir = os.listdir(path) - for i in get_dir: - sub_dir = os.path.join(path, i) - if os.path.isdir(sub_dir): - all_file = get_all_file_relative(sub_dir) - all_file = map(lambda x: i + os.sep + x, all_file) - result.extend(all_file) - else: - result.append(i) - return result - - -def print_info(message, id=None): - message = message.__str__() - # i = random.randint(34, 37) - i = 36 - log(message, id, 'info') - print('\033[7;30;{i}m{message}\033[0m'.format(message=message, i=i)) - - -def print_warn(message, id=None): - message = message.__str__() - log(message, id, 'warn') - print('\033[7;30;33m{message}\033[0m'.format(message=message)) - - -def print_error(message, id=None): - message = message.__str__() - log(message, id, 'error') - print('\033[7;30;31m{message}\033[0m'.format(message=message)) - - -def print_success(message, id=None): - message = message.__str__() - log(message, id, 'success') - print('\033[7;30;32m{message}\033[0m'.format(message=message)) - - -def date(timestamp=None): - if not timestamp: - timestamp = get_timestamp() - return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) - - -def get_timestamp(): - return int(time.time()) - - -def log(message, id=None, log_level='info'): - task_log_id = None - if not id is None: - db = get_db() - idata = { - 'task_id': id, - 'content': message, - 'log_level': log_level, - 'create_time': get_timestamp(), - } - task_log_id = db.table('task_log').insert(idata) - file = get_running_path('/log/' + time.strftime("%Y-%m-%d", time.localtime()) + '.log') - if not os.path.exists(os.path.dirname(file)): - os.mkdir(os.path.dirname(file)) - with open(file, 'a') as f: - f.write('【{date}】{message}\n'.format(date=date(time.time()), message=message)) - return task_log_id - - -def update_task_log(task_log_id, message): - db = get_db() - return db.table('task_log').where('id=?', task_log_id).update({ - 'content': message - }) - - -def get_xml_tag_value(xml_string, tag_name): - DOMTree = parseString(xml_string) - DOMTree = DOMTree.documentElement - tag = DOMTree.getElementsByTagName(tag_name) - if len(tag) > 0: - for node in tag[0].childNodes: - if node.nodeType == node.TEXT_NODE: - return node.data - return False - - -def load_task(): - db = get_db() - return db.table('task').where('finish_time=?', 0).order('create_time asc').limit('25').select() - - -def save_task(task_id, udata): - db = get_db() - return db.table('task').where('id=?', (task_id,)).update(udata) - - -def create_task(data): - db = get_db() - db.table('task').insert(data) - - -def read_in_chunks(file_object, chunk_size=16 * 1024, total_size=10 * 1024 * 1024): - load_size = 0 - while True: - if load_size >= total_size: - break - data = file_object.read(chunk_size) - if not data: - break - load_size += 16 * 1024 - yield data diff --git a/aliyundrive-uploader/gen-alidriveconf.sh b/aliyundrive-uploader/gen-alidriveconf.sh deleted file mode 100755 index 32ad8e3..0000000 --- a/aliyundrive-uploader/gen-alidriveconf.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# SPDX-License-Identifier: GPL-3.0-only -# -# Copyright (C) 2021 ImmortalWrt.org - -cat > "$PWD/config.json" < -# +------------------------------------------------------------------- - -import os -import signal -import time -from concurrent.futures import ThreadPoolExecutor - -from AliyunDrive import AliyunDrive -from Client import Client -from common import DATA, print_error, get_db, get_timestamp, print_info, load_task, create_task, suicide, ctrl_c - -if __name__ != '__main__': - suicide() - - -signal.signal(signal.SIGINT, ctrl_c) -signal.signal(signal.SIGTERM, ctrl_c) - -client = Client() -# 数据库初始化 -client.init_database() -# 配置信息初始化 -client.init_config() -# 命令行参数初始化,读取文件列表 -client.init_command_line_parameter() -# 输出配置信息 -client.print_config_info() -db = get_db() -# 是否常驻运行 -if not DATA['config']['RESIDENT']: - for v in client.tasks: - tmp = DATA['task_template'].copy() - tmp.update({ - "filepath": v, - "realpath": DATA['config']['FILE_PATH'] + v, - "create_time": get_timestamp(), - }) - find = db.table('task').where('filepath=? and realpath=?', (tmp['filepath'], tmp['realpath'],)).find() - if find: - print_info('【%s】已存在任务队列中,跳过' % tmp['filepath']) - else: - create_task(tmp) - - -def thread(task): - drive = client.upload_file(task) - drive.finish_time = get_timestamp() - drive.spend_time = drive.finish_time - drive.start_time - if drive.status != 1: - print_error(os.path.basename(drive.filepath) + ' 上传失败') - client.save_task(drive) - - -def distribute_thread(tasks): - if not DATA['config']['MULTITHREADING'] or int(DATA['config']['MAX_WORKERS']) <= 0: - for task in tasks: - thread(task) - else: - with ThreadPoolExecutor(max_workers=int(DATA['config']['MAX_WORKERS'])) as executor: - for task in tasks: - # 提交线程 - executor.submit(thread, task) - - -# 定时任务 -def crontab(): - def crontab_tasks(): - # 定时刷新token - (AliyunDrive(DATA['config']['DRIVE_ID'], DATA['config']['ROOT_PATH'], - DATA['config']['CHUNK_SIZE'])).token_refresh() - - time_period = DATA['time_period'] - crontab_tasks() - while True: - if time_period <= 0: - try: - crontab_tasks() - except Exception as e: - print_error(e.__str__()) - finally: - time_period = DATA['time_period'] - else: - time_period -= 1 - time.sleep(1) - - -(ThreadPoolExecutor()).submit(crontab) - -is_RESIDENT = DATA['config']['RESIDENT'] -while True: - client.tasks = load_task() - if len(client.tasks) <= 0: - if not is_RESIDENT: - suicide() - else: - print_info('当前无任务,等待新的任务队列中', 0) - time.sleep(5) - continue - distribute_thread(client.tasks) diff --git a/aliyundrive-uploader/requirements.txt b/aliyundrive-uploader/requirements.txt deleted file mode 100644 index 9fe130a..0000000 --- a/aliyundrive-uploader/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -certifi==2020.12.5 -chardet==4.0.0 -idna==2.10 -requests==2.25.1 -tqdm==4.59.0 -urllib3==1.26.5 -wincertstore==0.2 \ No newline at end of file diff --git a/aliyundrive-uploader/sqlite.py b/aliyundrive-uploader/sqlite.py deleted file mode 100644 index a54d5b1..0000000 --- a/aliyundrive-uploader/sqlite.py +++ /dev/null @@ -1,378 +0,0 @@ -import sys - -import sqlite3 -import os, re, time - - -def get_running_path(path=''): - if getattr(sys, 'frozen', False): - return os.path.dirname(sys.executable) + path - elif __file__: - return os.path.dirname(__file__) + path - - -class sqlite(): - # ------------------------------ - # 数据库操作类 For sqlite3 - # ------------------------------ - DB_FILE = None # 数据库文件 - DB_CONN = None # 数据库连接对象 - DB_TABLE = "" # 被操作的表名称 - OPT_WHERE = "" # where条件 - OPT_LIMIT = "" # limit条件 - OPT_ORDER = "" # order条件 - OPT_GROUPBY = "" # group by条件 - OPT_FIELD = "*" # field条件 - OPT_PARAM = () # where值 - __LOCK = get_running_path() + 'sqlite_lock.pl' - - def __init__(self): - self.DB_FILE = 'data/default.db' - - def GetConn(self): - # 取数据库对象 - try: - if self.DB_CONN == None: - self.DB_CONN = sqlite3.connect(self.DB_FILE) - if sys.version_info[0] == 3: - self.DB_CONN.text_factory = lambda x: str(x, - encoding="utf-8", errors='ignore') - else: - self.DB_CONN.text_factory = lambda x: unicode(x, - "utf-8", "ignore") - except Exception as ex: - print(ex) - return "error: " + str(ex) - - def dbfile(self, path): - if not os.path.isfile(path): - raise RuntimeError("数据库文件不存在。") - self.DB_FILE = path - return self - - def table(self, table): - # 设置表名 - self.DB_TABLE = table - return self - - def where(self, where, param): - # WHERE条件 - if where: - self.OPT_WHERE = " WHERE " + where - self.OPT_PARAM = self.__to_tuple(param) - return self - - def __to_tuple(self, param): - # 将参数转换为tuple - if type(param) != tuple: - if type(param) == list: - param = tuple(param) - else: - param = (param,) - return param - - def order(self, order): - # ORDER条件 - if len(order): - self.OPT_ORDER = " ORDER BY " + order - return self - - def groupby(self, group): - if len(group): - self.OPT_GROUPBY = " GROUP BY " + group - return self - - def limit(self, limit): - # LIMIT条件 - if len(limit): - self.OPT_LIMIT = " LIMIT " + limit - return self - - def field(self, field): - # FIELD条件 - if len(field): - self.OPT_FIELD = field - return self - - def log(self, msg): - log_file = "/www/server/panel/logs/error.log" - if sys.version_info[0] == 3: - with open(log_file, "a", encoding="utf-8") as fp: - fp.write("\n" + msg) - else: - with open(log_file, "a") as fp: - fp.write("\n" + msg) - - def select(self): - # 查询数据集 - self.GetConn() - try: - self.__get_columns() - sql = "SELECT " + self.OPT_FIELD + " FROM " + self.DB_TABLE + self.OPT_WHERE + self.OPT_GROUPBY + self.OPT_ORDER + self.OPT_LIMIT - result = self.DB_CONN.execute(sql, self.OPT_PARAM) - data = result.fetchall() - # 构造字典系列 - if self.OPT_FIELD != "*": - fields = self.__format_field(self.OPT_FIELD.split(',')) - tmp = [] - for row in data: - i = 0 - tmp1 = {} - for key in fields: - tmp1[key] = row[i] - i += 1 - tmp.append(tmp1) - del (tmp1) - data = tmp - del (tmp) - else: - # 将元组转换成列表 - tmp = list(map(list, data)) - data = tmp - del (tmp) - self.__close() - return data - except Exception as ex: - return "error: " + str(ex) - - def get(self): - self.__get_columns() - return self.select() - - def __format_field(self, field): - fields = [] - for key in field: - s_as = re.search('\s+as\s+', key, flags=re.IGNORECASE) - if s_as: - as_tip = s_as.group() - key = key.split(as_tip)[1] - fields.append(key) - return fields - - def __get_columns(self): - if self.OPT_FIELD == '*': - tmp_cols = self.query('PRAGMA table_info(' + self.DB_TABLE + ')', ()) - cols = [] - for col in tmp_cols: - if len(col) > 2: cols.append(col[1]) - if len(cols) > 0: self.OPT_FIELD = ','.join(cols) - - def getField(self, keyName): - # 取回指定字段 - try: - result = self.field(keyName).select() - if len(result) != 0: - return result[0][keyName] - return result - except: - return None - - def setField(self, keyName, keyValue): - # 更新指定字段 - return self.save(keyName, (keyValue,)) - - def find(self): - # 取一行数据 - try: - result = self.limit("1").select() - if len(result) == 1: - return result[0] - return result - except: - return None - - def count(self): - # 取行数 - key = "COUNT(*)" - data = self.field(key).select() - try: - return int(data[0][key]) - except: - return 0 - - def add(self, keys, param): - # 插入数据 - self.write_lock() - self.GetConn() - self.DB_CONN.text_factory = str - try: - values = "" - for key in keys.split(','): - values += "?," - values = values[0:len(values) - 1] - sql = "INSERT INTO " + self.DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")" - result = self.DB_CONN.execute(sql, self.__to_tuple(param)) - id = result.lastrowid - self.__close() - self.DB_CONN.commit() - self.rm_lock() - return id - except Exception as ex: - return "error: " + str(ex) - - # 插入数据 - def insert(self, pdata): - if not pdata: return False - keys, param = self.__format_pdata(pdata) - return self.add(keys, param) - - # 更新数据 - def update(self, pdata): - if not pdata: return False - keys, param = self.__format_pdata(pdata) - return self.save(keys, param) - - # 构造数据 - def __format_pdata(self, pdata): - keys = pdata.keys() - keys_str = ','.join(keys) - param = [] - for k in keys: param.append(pdata[k]) - return keys_str, tuple(param) - - def addAll(self, keys, param): - # 插入数据 - self.write_lock() - self.GetConn() - self.DB_CONN.text_factory = str - try: - values = "" - for key in keys.split(','): - values += "?," - values = values[0:len(values) - 1] - sql = "INSERT INTO " + self.DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")" - result = self.DB_CONN.execute(sql, self.__to_tuple(param)) - self.rm_lock() - return True - except Exception as ex: - return "error: " + str(ex) - - def commit(self): - self.__close() - self.DB_CONN.commit() - - def save(self, keys, param): - # 更新数据 - self.write_lock() - self.GetConn() - self.DB_CONN.text_factory = str - try: - opt = "" - for key in keys.split(','): - opt += key + "=?," - opt = opt[0:len(opt) - 1] - sql = "UPDATE " + self.DB_TABLE + " SET " + opt + self.OPT_WHERE - # 处理拼接WHERE与UPDATE参数 - tmp = list(self.__to_tuple(param)) - for arg in self.OPT_PARAM: - tmp.append(arg) - self.OPT_PARAM = tuple(tmp) - result = self.DB_CONN.execute(sql, self.OPT_PARAM) - self.__close() - self.DB_CONN.commit() - self.rm_lock() - return result.rowcount - except Exception as ex: - return "error: " + str(ex) - - def delete(self, id=None): - # 删除数据 - self.write_lock() - self.GetConn() - try: - if id: - self.OPT_WHERE = " WHERE id=?" - self.OPT_PARAM = (id,) - sql = "DELETE FROM " + self.DB_TABLE + self.OPT_WHERE - result = self.DB_CONN.execute(sql, self.OPT_PARAM) - self.__close() - self.DB_CONN.commit() - self.rm_lock() - return result.rowcount - except Exception as ex: - return "error: " + str(ex) - - def execute(self, sql, param=(), auto_commit=True): - # 执行SQL语句返回受影响行 - self.write_lock() - self.GetConn() - try: - result = self.DB_CONN.execute(sql, self.__to_tuple(param)) - if auto_commit: - self.DB_CONN.commit() - self.rm_lock() - return result.rowcount - except Exception as ex: - return "error: " + str(ex) - - # 是否有锁 - def is_lock(self): - n = 0 - while os.path.exists(self.__LOCK): - n += 1 - if n > 100: - self.rm_lock() - break - time.sleep(0.01) - - # 写锁 - def write_lock(self): - self.is_lock() - with(open(self.__LOCK, 'w+'))as f: - f.write("True") - - # 解锁 - def rm_lock(self): - if os.path.exists(self.__LOCK): - os.remove(self.__LOCK) - - def query(self, sql, param=()): - # 执行SQL语句返回数据集 - self.GetConn() - try: - result = self.DB_CONN.execute(sql, self.__to_tuple(param)) - # self.log("result:" + str(result)) - # 将元组转换成列表 - data = list(map(list, result)) - return data - except Exception as ex: - return "error: " + str(ex) - - def create(self, name): - # 创建数据表 - self.write_lock() - self.GetConn() - with(open('data/' + name + '.sql', 'rb')) as f: - script = f.read().decode('utf-8') - result = self.DB_CONN.executescript(script) - self.DB_CONN.commit() - self.rm_lock() - return result.rowcount - - def fofile(self, filename): - # 执行脚本 - self.write_lock() - self.GetConn() - with(open(filename, 'rb')) as f: - script = f.read().decode('utf-8') - result = self.DB_CONN.executescript(script) - self.DB_CONN.commit() - self.rm_lock() - return result.rowcount - - def __close(self): - # 清理条件属性 - self.OPT_WHERE = "" - self.OPT_FIELD = "*" - self.OPT_ORDER = "" - self.OPT_LIMIT = "" - self.OPT_GROUPBY = "" - self.OPT_PARAM = () - - def close(self): - # 释放资源 - try: - self.DB_CONN.close() - self.DB_CONN = None - except: - pass