Skip to content

Commit

Permalink
feat(app):
Browse files Browse the repository at this point in the history
- add apscheduler
- reorder scheduling process
  • Loading branch information
MorvanZhou committed May 12, 2024
1 parent aaa4707 commit 541add5
Show file tree
Hide file tree
Showing 41 changed files with 1,409 additions and 827 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ whoosh~=2.7.4
jieba>=0.42.1
starlette>=0.37.2
jinja2~=3.1.3
apscheduler~=3.10.4
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ install_requires =
jieba>=0.42.1
starlette>=0.27.0
jinja2~=3.1.3
apscheduler~=3.10.4

[options.packages.find]
where = src
Expand Down
2 changes: 2 additions & 0 deletions src/retk/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ PLUGINS=true
DB_NAME=rethink

# auth secret
JWT_REFRESH_EXPIRED_DAYS=7300
JWT_ACCESS_EXPIRED_MINS=120
JWT_KEY_PUB="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC7EqXzFLuLAFWTrcRi8UTA8sEpsoP+peDR+1KgBiOzwfn9PoJhAE6XsrsDC7tRr0nNKWPD95dAM6ySEHNEUPmkxvKRYGvkgp8s4Axv+9Lar63v7ZPJRbumI/JRzOOJvqBAYFUBX9OIGWGDJow8Dq0tLaj9t39lq2AStmIA3zIBreaFAH/cH1Ig5Z5zdD/M1T4AGl0kHoYKBh+jew7syyZcM22cKLCilT9oWTWpmJx17mf/Q17+7t4F8NS3ccQ6NpqpWYT03D6tgC/HgzIT6iMuJxdtil+e5/PcCeqaigBqehxwCDy5si9W7aOXALGZqmM5M68Lp2mn6386MKXh0WPf0Q2D1HSQNU2fdbdJ4EfHFwq/kWiFLd2u3AdxSnZaNYBH1pd4X2TXEtdmtAFwuJ/i5GQBs3Fy/Ukbdfrkc8ywzpsxTlCUnyiWXmWbh1YHME+7E5C4HlinvaD3x+znKmdoRnzo3gAlE9Q5fxq0D72RipDOqo0RH/PV9p904ZN9t+gj3TTNAbLf0ISbsVjw8ZLYg43pPAeHVwQVKLlR7YCHdsD8LoDqZSzWXx047JFlRFkvmBKUuPLDNMwGLhZzmwWoQIcxwDhcn+5NeeV8S3TS7QpXfEc/Z+hSwR10zoFrpNxEoygqhed8AlqrmFkzWy7d54HrhvHFOWmW6rA76VFPTQ=="
JWT_KEY="-----BEGIN PRIVATE KEY-----
MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQC7EqXzFLuLAFWT
Expand Down
2 changes: 1 addition & 1 deletion src/retk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from . import version_manager # noqa: F401
from ._version import __version__ # noqa: F401
from .core import scheduler
from .models import tps # noqa: F401
from .plugins import schedule # noqa: F401
from .plugins.base import ( # noqa: F401
Plugin,
add_plugin,
Expand Down
8 changes: 3 additions & 5 deletions src/retk/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi.middleware.cors import CORSMiddleware

from retk import const, config, safety, utils
from retk.core import async_task
from retk.core import scheduler
from retk.logger import logger, add_rotating_file_handler
from .models.client import client
from .routes import (
Expand All @@ -17,7 +17,6 @@
files,
plugin,
self_hosted,
app_system,
account,
admin,
statistic,
Expand Down Expand Up @@ -48,7 +47,6 @@
app_captcha,
files,
plugin,
app_system,
account,
admin,
statistic,
Expand Down Expand Up @@ -80,5 +78,5 @@ async def startup_event():
# local finish up
utils.local_finish_up()

# async threading task
async_task.init()
# schedule job
scheduler.start()
30 changes: 0 additions & 30 deletions src/retk/controllers/app_system.py

This file was deleted.

8 changes: 4 additions & 4 deletions src/retk/controllers/self_hosted.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from bson import ObjectId

from retk import const, __version__
from retk.core import self_hosted
from retk.core.notice import put_system_notice
from retk.logger import logger
from retk.models.client import client
from retk.models.tps import AuthedUser
from retk.utils import get_latest_version, parse_version

_local_system_authed_user = AuthedUser(
u=AuthedUser.User(
Expand Down Expand Up @@ -44,12 +44,12 @@
)


async def notice_new_version():
remote, code = await get_latest_version()
async def notice_new_pkg_version():
remote, code = await self_hosted.get_latest_pkg_version()
if code != const.CodeEnum.OK:
logger.error("get latest version failed")
return
local = parse_version(__version__)
local = self_hosted.parse_version(__version__)
if local is None:
logger.error("parse version failed")
return
Expand Down
2 changes: 2 additions & 0 deletions src/retk/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
files,
account,
statistic,
self_hosted,
notice,
)
2 changes: 1 addition & 1 deletion src/retk/core/account/app_captcha.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
img_captcha = ImageCaptcha(font_sizes=(35, 30, 32))
audio_captcha = AudioCaptcha()

alphabet = "347ACEFGJKLMNPRTXY"
alphabet = "347ACEFGJLMNPRTY"
alphabet_len = len(alphabet)
code_idx_range = list(range(0, alphabet_len - 1))

Expand Down
9 changes: 4 additions & 5 deletions src/retk/core/account/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jwt

from retk import const, config, regex, utils
from retk.core import async_task
from retk.core import scheduler


class EmailServer:
Expand Down Expand Up @@ -87,10 +87,9 @@ def _send(self, recipients: List[str], subject: str, html_message: str) -> const
html_body = MIMEText(html_message, 'html', 'utf-8')
msg.attach(html_body)

async_task.put_task(
task_name=async_task.TaskName.SEND_EMAIL,
recipients=recipients,
subject=msg.as_string()
scheduler.run_once_now(
func=scheduler.tasks.email.send,
kwargs={"recipients": recipients, "subject": msg.as_string()},
)
return const.CodeEnum.OK

Expand Down
1 change: 0 additions & 1 deletion src/retk/core/async_task/__init__.py

This file was deleted.

48 changes: 0 additions & 48 deletions src/retk/core/async_task/manager.py

This file was deleted.

6 changes: 6 additions & 0 deletions src/retk/core/notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ async def put_system_notice(
if not res.acknowledged:
return const.CodeEnum.OPERATION_FAILED
return const.CodeEnum.OK


async def get_unscheduled_system_notices() -> list[NoticeManagerDelivery]:
return await client.coll.notice_system.find({
"scheduled": False,
}).to_list(None)
10 changes: 10 additions & 0 deletions src/retk/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from . import tasks
from . import timing
from .base import (
start,
stop,
run_once_at,
run_once_after,
run_once_now,
run_every_at,
)
115 changes: 115 additions & 0 deletions src/retk/core/scheduler/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from datetime import datetime, timedelta
from typing import Callable, Optional, Tuple, Dict, Any

from apscheduler.schedulers.background import BackgroundScheduler

"""
- BlockingScheduler:
use when the scheduler is the only thing running in your process
- BackgroundScheduler:
use when you’re not using any of the frameworks below,
and want the scheduler to run in the background inside your application
- AsyncIOScheduler:
use if your application uses the asyncio module
- GeventScheduler:
use if your application uses gevent
- TornadoScheduler:
use if you’re building a Tornado application
- TwistedScheduler:
use if you’re building a Twisted application
- QtScheduler:
use if you’re building a Qt application
"""

# a separate thread
_scheduler = BackgroundScheduler()


def start():
_scheduler.start()


def stop():
_scheduler.shutdown()


def _get_default(args, kwargs):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
return args, kwargs


def run_once_at(
func: Callable,
time: datetime,
args: Optional[Tuple] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
args, kwargs = _get_default(args, kwargs)
_scheduler.add_job(
func=func,
trigger="date",
run_date=time,
args=args,
kwargs=kwargs,
)


def run_once_now(
func: Callable,
args: Optional[Tuple] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
return run_once_at(func=func, time=datetime.now(), args=args, kwargs=kwargs)


def run_once_after(
func: Callable,
second: float,
args: Optional[Tuple] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
return run_once_at(
func=func,
time=datetime.now() + timedelta(seconds=second),
args=args,
kwargs=kwargs,
)


def run_every_at(
func: Callable,
second: Optional[int] = None,
minute: Optional[int] = None,
hour: Optional[int] = None,
day: Optional[int] = None,
month: Optional[int] = None,
day_of_week: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
args: Optional[Tuple] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
args, kwargs = _get_default(args, kwargs)
_scheduler.add_job(
func=func,
trigger="cron",
second=second,
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week,
start_date=start_date,
end_date=end_date,
args=args,
kwargs=kwargs,
)
4 changes: 4 additions & 0 deletions src/retk/core/scheduler/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import (
test,
email,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from retk import config


def task(
def send(
recipients: list,
subject: str,
) -> str:
server = None
for i in range(3):
for _ in range(3):
try:
server = smtplib.SMTP('smtp.office365.com', 587)
except smtplib.SMTPServerDisconnected as e:
except smtplib.SMTPServerDisconnected:
pass
if server is None:
return f"try send email task 3 times, but failed, SMTPServerDisconnected"
Expand Down
2 changes: 2 additions & 0 deletions src/retk/core/scheduler/tasks/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def print_test(txt: str):
print(f"test and print '{txt}'")
File renamed without changes.
Loading

0 comments on commit 541add5

Please sign in to comment.