From ca5e35ce4fa797d349d8e48272961222d75f6820 Mon Sep 17 00:00:00 2001 From: morvanzhou Date: Fri, 17 May 2024 02:02:22 +0800 Subject: [PATCH] feature(app): - fix unittest - add get system notices --- src/retk/controllers/manager.py | 19 +++ src/retk/controllers/schemas/manager.py | 17 +++ src/retk/core/notice.py | 118 +++-------------- src/retk/core/scheduler/__init__.py | 2 +- .../core/scheduler/{base.py => schedule.py} | 11 +- src/retk/core/scheduler/tasks/__init__.py | 1 + src/retk/core/scheduler/tasks/notice.py | 120 +++++++++++++++++ src/retk/models/client.py | 39 +++--- src/retk/routes/manager.py | 18 ++- tests/test_api.py | 24 +++- tests/test_core_local.py | 120 ++++++++++------- tests/test_core_remote.py | 124 +++++++++++------- tests/test_core_scheduler.py | 2 +- 13 files changed, 395 insertions(+), 220 deletions(-) rename src/retk/core/scheduler/{base.py => schedule.py} (97%) create mode 100644 src/retk/core/scheduler/tasks/notice.py diff --git a/src/retk/controllers/manager.py b/src/retk/controllers/manager.py index dfcdfa8..a730806 100644 --- a/src/retk/controllers/manager.py +++ b/src/retk/controllers/manager.py @@ -86,3 +86,22 @@ async def post_in_manager_delivery( return schemas.RequestIdResponse( requestId=au.request_id, ) + + +async def get_system_notices( + au: AuthedUser, + page: int, + limit: int, +) -> schemas.manager.GetSystemNoticesResponse: + notices, total = await notice.get_system_notices( + page=page, + limit=limit, + ) + for n in notices: + n["id"] = str(n["_id"]) + del n["_id"] + return schemas.manager.GetSystemNoticesResponse( + requestId=au.request_id, + notices=notices, + total=total, + ) diff --git a/src/retk/controllers/schemas/manager.py b/src/retk/controllers/schemas/manager.py index 9634cb2..ba507fe 100644 --- a/src/retk/controllers/schemas/manager.py +++ b/src/retk/controllers/schemas/manager.py @@ -26,3 +26,20 @@ class ManagerNoticeDeliveryRequest(BaseModel): description="list of user ID if the recipientType is batch" ) publishAt: datetime = Field(..., description="publish time") + + +class GetSystemNoticesResponse(BaseModel): + class Notice(BaseModel): + id: str = Field(description="notice ID") + title: str = Field(description="title") + content: str = Field(description="content") + recipientType: notice.RecipientTypeEnum = Field(description="recipient type") + batchTypeIds: List[str] = Field(description="list of user ID if the recipientType is batch") + publishAt: datetime = Field(description="publish time") + scheduled: bool = Field(description="scheduled") + + requestId: str = Field(max_length=settings.REQUEST_ID_MAX_LENGTH, description="request ID") + notices: List[Notice] = Field( + description="list of notices" + ) + total: int = Field(description="total number of notices") diff --git a/src/retk/core/notice.py b/src/retk/core/notice.py index 45b4276..362dce6 100644 --- a/src/retk/core/notice.py +++ b/src/retk/core/notice.py @@ -1,4 +1,3 @@ -import asyncio from datetime import datetime from typing import List, Dict, Optional, Tuple @@ -22,6 +21,13 @@ async def post_in_manager_delivery( return None, const.CodeEnum.NOT_PERMITTED if publish_at is None: publish_at = datetime.now(tz=utc) + + # if publish_at is not utc, convert it to utc + if publish_at.tzinfo is None or publish_at.tzinfo.utcoffset(publish_at) is None: + publish_at = publish_at.replace(tzinfo=utc) + elif publish_at.tzinfo != utc: + publish_at = publish_at.astimezone(utc) + # add system notice notice: NoticeManagerDelivery = { "_id": ObjectId(), @@ -40,107 +46,15 @@ async def post_in_manager_delivery( return notice, const.CodeEnum.OK -async def __get_users_in_batches(batch_size=100): - # Get the total number of users - total_users = await client.coll.users.count_documents({}) - if config.is_local_db(): - fn = client.coll.users.find( - {}, - ).sort( - [("_id", -1)] - ) - else: - fn = client.coll.users.find( - {}, projection=["id"] - ).sort( - [("_id", -1)] - ) - for i in range(0, total_users, batch_size): - # Sort by _id in descending order and limit the result - users = await fn.skip(i).limit(batch_size).to_list(None) - yield users - - -async def __deliver_scheduled_system_notices_batch( - users: List[Dict[str, str]], - sender_id: str, - notice_id: ObjectId -): - notices = [{ - "_id": ObjectId(), - "senderId": sender_id, - "recipientId": user["id"], - "noticeId": notice_id, - "read": False, - "readTime": None, - } for user in users] - # Insert all notices at once - await client.coll.notice_system.insert_many(notices) - - -def deliver_unscheduled_system_notices(): - async def _deliver_unscheduled_system_notices(): - unscheduled = await client.coll.notice_manager_delivery.find({ - "scheduled": False, - }).to_list(None) - for notice in unscheduled: - notice_id = notice["_id"] - sender_id = notice["senderId"] - recipient_type = notice["recipientType"] - if notice["publishAt"] <= datetime.now(tz=utc): - # send notice - if recipient_type == const.notice.RecipientTypeEnum.ALL.value: - async for users in __get_users_in_batches(batch_size=100): - await __deliver_scheduled_system_notices_batch( - users=users, - sender_id=sender_id, - notice_id=notice_id - ) - elif recipient_type == const.notice.RecipientTypeEnum.BATCH.value: - batch_type_ids = notice["batchTypeIds"] - # Create a list of notices - notices = [{ - "_id": ObjectId(), - "senderId": sender_id, - "recipientId": user_id, - "noticeId": notice_id, - "read": False, - "readTime": None, - } for user_id in batch_type_ids] - # Insert all notices at once - await client.coll.notice_system.insert_many(notices) - elif recipient_type == const.notice.RecipientTypeEnum.ADMIN.value: - # Get all admins - admins = await client.coll.users.find( - {"type": const.USER_TYPE.ADMIN.value}, {"id", 1}).to_list(None) - await __deliver_scheduled_system_notices_batch( - users=admins, - sender_id=sender_id, - notice_id=notice_id - ) - elif recipient_type == const.notice.RecipientTypeEnum.MANAGER.value: - # Get all managers - managers = await client.coll.users.find( - {"type": const.USER_TYPE.MANAGER.value}, {"id", 1}).to_list(None) - await __deliver_scheduled_system_notices_batch( - users=managers, - sender_id=sender_id, - notice_id=notice_id - ) - else: - raise ValueError(f"Unknown recipient type: {recipient_type}") - - # Update the notice to indicate that it has been scheduled - await client.coll.notice_manager_delivery.update_one( - {"_id": notice_id}, - {"$set": {"scheduled": True}} - ) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - res = loop.run_until_complete(_deliver_unscheduled_system_notices()) - loop.close() - return res +async def get_system_notices( + page: int, + limit: int, +) -> Tuple[List[NoticeManagerDelivery], int]: + total = await client.coll.notice_manager_delivery.count_documents({}) + notices = await client.coll.notice_manager_delivery.find( + {} + ).skip(page * limit).limit(limit=limit).to_list(None) + return notices, total async def get_user_notices(au: AuthedUser) -> Tuple[Dict[str, List[Dict]], const.CodeEnum]: diff --git a/src/retk/core/scheduler/__init__.py b/src/retk/core/scheduler/__init__.py index 5108c76..3ba4b8e 100644 --- a/src/retk/core/scheduler/__init__.py +++ b/src/retk/core/scheduler/__init__.py @@ -1,6 +1,6 @@ from . import tasks from . import timing -from .base import ( # noqa: F401 +from .schedule import ( # noqa: F401 init_tasks, start, stop, diff --git a/src/retk/core/scheduler/base.py b/src/retk/core/scheduler/schedule.py similarity index 97% rename from src/retk/core/scheduler/base.py rename to src/retk/core/scheduler/schedule.py index 918818a..270505a 100644 --- a/src/retk/core/scheduler/base.py +++ b/src/retk/core/scheduler/schedule.py @@ -9,6 +9,7 @@ from retk import const from retk.const.settings import MAX_SCHEDULE_JOB_INFO_LEN +from . import tasks """ - BlockingScheduler: @@ -88,11 +89,11 @@ def wrapper(*args, **kwargs): def init_tasks(): # check unscheduled system notices every minute - # run_every_at( - # job_id="deliver_unscheduled_system_notices", - # func=deliver_unscheduled_system_notices, - # second=0, - # ) + run_every_at( + job_id="deliver_unscheduled_system_notices", + func=tasks.notice.deliver_unscheduled_system_notices, + second=0, + ) return diff --git a/src/retk/core/scheduler/tasks/__init__.py b/src/retk/core/scheduler/tasks/__init__.py index 5d71d39..dcc6849 100644 --- a/src/retk/core/scheduler/tasks/__init__.py +++ b/src/retk/core/scheduler/tasks/__init__.py @@ -1,3 +1,4 @@ from . import ( # noqa: F401 email, + notice, ) diff --git a/src/retk/core/scheduler/tasks/notice.py b/src/retk/core/scheduler/tasks/notice.py new file mode 100644 index 0000000..8bcdd08 --- /dev/null +++ b/src/retk/core/scheduler/tasks/notice.py @@ -0,0 +1,120 @@ +import asyncio +from datetime import datetime +from typing import List, Dict + +from bson import ObjectId +from bson.tz_util import utc + +from retk import const, config +from retk.models.client import init_mongo + + +async def __get_users_in_batches(db, batch_size=100): + # Get the total number of users + total_users = await db["users"].count_documents({}) + if config.is_local_db(): + fn = db["users"].find( + {}, + ).sort( + [("_id", -1)] + ) + else: + fn = db["users"].find( + {}, projection=["id"] + ).sort( + [("_id", -1)] + ) + for i in range(0, total_users, batch_size): + # Sort by _id in descending order and limit the result + users = await fn.skip(i).limit(batch_size).to_list(None) + yield users + + +async def __deliver_scheduled_system_notices_batch( + db, + users: List[Dict[str, str]], + sender_id: str, + notice_id: ObjectId +): + notices = [{ + "_id": ObjectId(), + "senderId": sender_id, + "recipientId": user["id"], + "noticeId": notice_id, + "read": False, + "readTime": None, + } for user in users] + # Insert all notices at once + await db["noticeSystem"].insert_many(notices) + + +def deliver_unscheduled_system_notices(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + res = loop.run_until_complete(async_deliver_unscheduled_system_notices()) + loop.close() + return res + + +async def async_deliver_unscheduled_system_notices(): + _, db = init_mongo(connection_timeout=5) + unscheduled = await db["noticeManagerDelivery"].find({ + "scheduled": False, + }).to_list(None) + for notice in unscheduled: + notice_id = notice["_id"] + sender_id = notice["senderId"] + recipient_type = notice["recipientType"] + # notice["publishAt"] is utc time but the tzinfo is not set, so we need to set it + if notice["publishAt"].astimezone(utc) <= datetime.now(tz=utc): + # send notice + if recipient_type == const.notice.RecipientTypeEnum.ALL.value: + async for users in __get_users_in_batches(db, batch_size=100): + await __deliver_scheduled_system_notices_batch( + db=db, + users=users, + sender_id=sender_id, + notice_id=notice_id + ) + elif recipient_type == const.notice.RecipientTypeEnum.BATCH.value: + batch_type_ids = notice["batchTypeIds"] + # Create a list of notices + notices = [{ + "_id": ObjectId(), + "senderId": sender_id, + "recipientId": user_id, + "noticeId": notice_id, + "read": False, + "readTime": None, + } for user_id in batch_type_ids] + # Insert all notices at once + await db["noticeSystem"].insert_many(notices) + elif recipient_type == const.notice.RecipientTypeEnum.ADMIN.value: + # Get all admins + admins = await db["users"].find( + {"type": const.USER_TYPE.ADMIN.value}, {"id", 1}).to_list(None) + await __deliver_scheduled_system_notices_batch( + db=db, + users=admins, + sender_id=sender_id, + notice_id=notice_id + ) + elif recipient_type == const.notice.RecipientTypeEnum.MANAGER.value: + # Get all managers + managers = await db["users"].find( + {"type": const.USER_TYPE.MANAGER.value}, {"id", 1}).to_list(None) + await __deliver_scheduled_system_notices_batch( + db=db, + users=managers, + sender_id=sender_id, + notice_id=notice_id + ) + else: + raise ValueError(f"Unknown recipient type: {recipient_type}") + + # Update the notice to indicate that it has been scheduled + await db["noticeManagerDelivery"].update_one( + {"_id": notice_id}, + {"$set": {"scheduled": True}} + ) + return diff --git a/src/retk/models/client.py b/src/retk/models/client.py index 487c903..42bae8e 100644 --- a/src/retk/models/client.py +++ b/src/retk/models/client.py @@ -23,6 +23,27 @@ pass +def init_mongo(connection_timeout: int): + conf = config.get_settings() + if config.is_local_db(): + if not conf.RETHINK_LOCAL_STORAGE_PATH.exists(): + raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}") + db_path = conf.RETHINK_LOCAL_STORAGE_PATH / ".data" / "db" + db_path.mkdir(parents=True, exist_ok=True) + mongo = MongitaClientDisk(db_path) + else: + mongo = AsyncIOMotorClient( + host=conf.DB_HOST, + port=conf.DB_PORT, + username=conf.DB_USER, + password=conf.DB_PASSWORD, + socketTimeoutMS=1000 * connection_timeout, + ) + + db = mongo[config.get_settings().DB_NAME] + return mongo, db + + class Client: coll: Collections = Collections() mongo: Optional[Union["AsyncIOMotorClient", MongitaClientDisk]] = None @@ -51,23 +72,7 @@ async def init(self): await self.try_restore_search() def init_mongo(self): - conf = config.get_settings() - if config.is_local_db(): - if not conf.RETHINK_LOCAL_STORAGE_PATH.exists(): - raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}") - db_path = conf.RETHINK_LOCAL_STORAGE_PATH / ".data" / "db" - db_path.mkdir(parents=True, exist_ok=True) - self.mongo = MongitaClientDisk(db_path) - else: - self.mongo = AsyncIOMotorClient( - host=conf.DB_HOST, - port=conf.DB_PORT, - username=conf.DB_USER, - password=conf.DB_PASSWORD, - socketTimeoutMS=1000 * self.connection_timeout, - ) - - db = self.mongo[config.get_settings().DB_NAME] + self.mongo, db = init_mongo(self.connection_timeout) self.coll.users = db["users"] self.coll.nodes = db["nodes"] self.coll.import_data = db["importData"] diff --git a/src/retk/routes/manager.py b/src/retk/routes/manager.py index 6ec4961..ab95d81 100644 --- a/src/retk/routes/manager.py +++ b/src/retk/routes/manager.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter +from fastapi import APIRouter, Query from retk.controllers import schemas, manager from retk.routes import utils @@ -91,3 +91,19 @@ async def post_in_manager_delivery( req: schemas.manager.ManagerNoticeDeliveryRequest, ) -> schemas.RequestIdResponse: return await manager.post_in_manager_delivery(au=au, req=req) + + +@router.get( + "/notices/system", + status_code=200, + response_model=schemas.manager.GetSystemNoticesResponse, + summary="Get unscheduled system notices", + description="Get unscheduled system notices", +) +@utils.measure_time_spend +async def get_unscheduled_system_notices( + au: ADMIN_AUTH, + p: int = Query(0, ge=0), + limit: int = Query(10, ge=1, le=100), +) -> schemas.manager.GetSystemNoticesResponse: + return await manager.get_system_notices(au=au, page=p, limit=limit) diff --git a/tests/test_api.py b/tests/test_api.py index b99e0c3..30bfb65 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -16,7 +16,7 @@ from retk import const, config, PluginAPICallReturn from retk.application import app -from retk.core import account, scheduler, notice +from retk.core import account, scheduler from retk.models.client import client from retk.models.tps import convert_user_dict_to_authed_user from retk.plugins.register import register_official_plugins, unregister_official_plugins @@ -1353,10 +1353,23 @@ async def test_system_notice(self): headers=self.default_headers, ) self.check_ok_response(resp, 201) + + resp = self.client.get( + "/api/managers/notices/system", + headers=self.default_headers, + ) + rj = self.check_ok_response(resp, 200) + self.assertEqual(1, rj["total"]) + self.assertEqual(1, len(rj["notices"])) + self.assertEqual("title", rj["notices"][0]["title"]) + self.assertEqual("content", rj["notices"][0]["content"]) + self.assertEqual(pa.strftime("%Y-%m-%dT%H:%M:%SZ"), rj["notices"][0]["publishAt"]) + self.assertFalse(rj["notices"][0]["scheduled"]) + scheduler.start() scheduler.run_once_now( job_id="deliver_unscheduled_system_notices1", - func=notice.deliver_unscheduled_system_notices, + func=scheduler.tasks.notice.deliver_unscheduled_system_notices, ) docs = await client.coll.notice_manager_delivery.find( @@ -1379,6 +1392,13 @@ async def test_system_notice(self): self.assertIsNotNone(j.finished_at) scheduler.stop() + resp = self.client.get( + "/api/managers/notices/system", + headers=self.default_headers, + ) + rj = self.check_ok_response(resp, 200) + self.assertTrue(rj["notices"][0]["scheduled"]) + self.set_access_token(u_token) resp = self.client.get( "/api/users/notices", diff --git a/tests/test_core_local.py b/tests/test_core_local.py index 9ce08fc..78eca1b 100644 --- a/tests/test_core_local.py +++ b/tests/test_core_local.py @@ -2,6 +2,7 @@ import shutil import time import unittest +from copy import deepcopy from io import BytesIO from pathlib import Path from textwrap import dedent @@ -17,6 +18,7 @@ from retk import const, core, config from retk.controllers.schemas.user import PatchUserRequest from retk.core.files.importing.async_tasks.utils import update_process +from retk.core.scheduler import tasks from retk.models import db_ops from retk.models.client import client from retk.models.tps import ImportData, AuthedUser, convert_user_dict_to_authed_user @@ -556,48 +558,76 @@ async def test_get_version(self): for num in v: self.assertTrue(isinstance(num, int)) - # async def test_notice(self): - # au = deepcopy(self.au) - # doc, code = await core.notice.post_in_manager_delivery( - # au=au, - # title="title", - # content="content", - # recipient_type=const.notice.RecipientTypeEnum.ALL.value, - # batch_type_ids=[], - # publish_at=None, - # ) - # self.assertEqual(const.CodeEnum.NOT_PERMITTED, code) - # - # au.u.type = const.USER_TYPE.MANAGER.id - # doc, code = await core.notice.post_in_manager_delivery( - # au=au, - # title="title", - # content="content", - # recipient_type=const.notice.RecipientTypeEnum.ALL.value, - # batch_type_ids=[], - # publish_at=None, - # ) - # self.assertEqual(const.CodeEnum.OK, code) - # - # await core.notice.deliver_unscheduled_system_notices() - # - # res = await client.coll.notice_system.find().to_list(None) - # self.assertEqual(1, len(res)) - # self.assertEqual(doc["_id"], res[0]["noticeId"]) - # self.assertEqual(au.u.id, res[0]["senderId"]) - # self.assertFalse(res[0]["read"]) - # - # d = await client.coll.notice_manager_delivery.find_one({"_id": doc["_id"]}) - # self.assertIsNotNone(d) - # self.assertTrue(d["scheduled"]) - # - # n, code = await core.notice.get_user_notices(au) - # self.assertEqual(const.CodeEnum.OK, code) - # sn = n["system"] - # self.assertEqual(1, len(sn)) - # self.assertEqual(doc["_id"], sn[0]["noticeId"]) - # self.assertEqual("title", sn[0]["title"]) - # self.assertEqual("content", sn[0]["content"]) - # self.assertLess(sn[0]["publishAt"], datetime.datetime.now()) - # self.assertFalse(sn[0]["read"]) - # self.assertIsNone(sn[0]["readTime"]) + async def test_system_notice(self): + au = deepcopy(self.au) + au.u.type = const.USER_TYPE.MANAGER.id + publish_at = datetime.datetime.now() + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=publish_at, + ) + self.assertEqual(const.CodeEnum.OK, code) + + docs, total = await core.notice.get_system_notices(0, 10) + self.assertEqual(1, len(docs)) + self.assertEqual(1, total) + self.assertEqual(doc["_id"], docs[0]["_id"]) + self.assertEqual("title", docs[0]["title"]) + self.assertEqual("content", docs[0]["content"]) + self.assertEqual(publish_at.second, docs[0]["publishAt"].second) + self.assertFalse(docs[0]["scheduled"]) + + await tasks.notice.async_deliver_unscheduled_system_notices() + time.sleep(0.01) + docs, total = await core.notice.get_system_notices(0, 10) + self.assertTrue(docs[0]["scheduled"]) + + async def test_notice(self): + au = deepcopy(self.au) + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=None, + ) + self.assertEqual(const.CodeEnum.NOT_PERMITTED, code) + + au.u.type = const.USER_TYPE.MANAGER.id + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=None, + ) + self.assertEqual(const.CodeEnum.OK, code) + + await tasks.notice.async_deliver_unscheduled_system_notices() + + res = await client.coll.notice_system.find().to_list(None) + self.assertEqual(1, len(res)) + self.assertEqual(doc["_id"], res[0]["noticeId"]) + self.assertEqual(au.u.id, res[0]["senderId"]) + self.assertFalse(res[0]["read"]) + + d = await client.coll.notice_manager_delivery.find_one({"_id": doc["_id"]}) + self.assertIsNotNone(d) + self.assertTrue(d["scheduled"]) + + n, code = await core.notice.get_user_notices(au) + self.assertEqual(const.CodeEnum.OK, code) + sn = n["system"] + self.assertEqual(1, len(sn)) + self.assertEqual(doc["_id"], sn[0]["noticeId"]) + self.assertEqual("title", sn[0]["title"]) + self.assertEqual("content", sn[0]["content"]) + self.assertLess(sn[0]["publishAt"], datetime.datetime.now()) + self.assertFalse(sn[0]["read"]) + self.assertIsNone(sn[0]["readTime"]) diff --git a/tests/test_core_remote.py b/tests/test_core_remote.py index c06e968..28f6880 100644 --- a/tests/test_core_remote.py +++ b/tests/test_core_remote.py @@ -1,5 +1,7 @@ +import datetime import time import unittest +from copy import deepcopy from textwrap import dedent from unittest.mock import patch @@ -10,6 +12,7 @@ from retk import const, config, core from retk.controllers.schemas.user import PatchUserRequest from retk.core.account.manager import signup +from retk.core.scheduler import tasks from retk.models import db_ops from retk.models.client import client from retk.models.tps import AuthedUser, convert_user_dict_to_authed_user @@ -497,49 +500,78 @@ async def test_md_history( config.get_settings().MD_BACKUP_INTERVAL = bi - # @utils.skip_no_connect - # async def test_notice(self): - # au = deepcopy(self.au) - # doc, code = await core.notice.post_in_manager_delivery( - # au=au, - # title="title", - # content="content", - # recipient_type=const.notice.RecipientTypeEnum.ALL.value, - # batch_type_ids=[], - # publish_at=None, - # ) - # self.assertEqual(const.CodeEnum.NOT_PERMITTED, code) - # - # au.u.type = const.USER_TYPE.MANAGER.id - # doc, code = await core.notice.post_in_manager_delivery( - # au=au, - # title="title", - # content="content", - # recipient_type=const.notice.RecipientTypeEnum.ALL.value, - # batch_type_ids=[], - # publish_at=None, - # ) - # self.assertEqual(const.CodeEnum.OK, code) - # - # await core.notice.deliver_unscheduled_system_notices() - # - # res = await client.coll.notice_system.find().to_list(None) - # self.assertEqual(1, len(res)) - # self.assertEqual(doc["_id"], res[0]["noticeId"]) - # self.assertEqual(au.u.id, res[0]["senderId"]) - # self.assertFalse(res[0]["read"]) - # - # d = await client.coll.notice_manager_delivery.find_one({"_id": doc["_id"]}) - # self.assertIsNotNone(d) - # self.assertTrue(d["scheduled"]) - # - # n, code = await core.notice.get_user_notices(au) - # self.assertEqual(const.CodeEnum.OK, code) - # sn = n["system"] - # self.assertEqual(1, len(sn)) - # self.assertEqual(doc["_id"], sn[0]["noticeId"]) - # self.assertEqual("title", sn[0]["title"]) - # self.assertEqual("content", sn[0]["content"]) - # self.assertLess(sn[0]["publishAt"], datetime.datetime.now()) - # self.assertFalse(sn[0]["read"]) - # self.assertIsNone(sn[0]["readTime"]) + @utils.skip_no_connect + async def test_system_notice(self): + au = deepcopy(self.au) + au.u.type = const.USER_TYPE.MANAGER.id + publish_at = datetime.datetime.now() + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=publish_at, + ) + self.assertEqual(const.CodeEnum.OK, code) + + docs, total = await core.notice.get_system_notices(0, 10) + self.assertEqual(1, len(docs)) + self.assertEqual(1, total) + self.assertEqual(doc["_id"], docs[0]["_id"]) + self.assertEqual("title", docs[0]["title"]) + self.assertEqual("content", docs[0]["content"]) + self.assertEqual(publish_at.second, docs[0]["publishAt"].second) + self.assertFalse(docs[0]["scheduled"]) + + await tasks.notice.async_deliver_unscheduled_system_notices() + time.sleep(0.01) + docs, total = await core.notice.get_system_notices(0, 10) + self.assertTrue(docs[0]["scheduled"]) + + @utils.skip_no_connect + async def test_notice(self): + au = deepcopy(self.au) + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=None, + ) + self.assertEqual(const.CodeEnum.NOT_PERMITTED, code) + + au.u.type = const.USER_TYPE.MANAGER.id + doc, code = await core.notice.post_in_manager_delivery( + au=au, + title="title", + content="content", + recipient_type=const.notice.RecipientTypeEnum.ALL.value, + batch_type_ids=[], + publish_at=None, + ) + self.assertEqual(const.CodeEnum.OK, code) + + await tasks.notice.async_deliver_unscheduled_system_notices() + + res = await client.coll.notice_system.find().to_list(None) + self.assertEqual(1, len(res)) + self.assertEqual(doc["_id"], res[0]["noticeId"]) + self.assertEqual(au.u.id, res[0]["senderId"]) + self.assertFalse(res[0]["read"]) + + d = await client.coll.notice_manager_delivery.find_one({"_id": doc["_id"]}) + self.assertIsNotNone(d) + self.assertTrue(d["scheduled"]) + + n, code = await core.notice.get_user_notices(au) + self.assertEqual(const.CodeEnum.OK, code) + sn = n["system"] + self.assertEqual(1, len(sn)) + self.assertEqual(doc["_id"], sn[0]["noticeId"]) + self.assertEqual("title", sn[0]["title"]) + self.assertEqual("content", sn[0]["content"]) + self.assertLess(sn[0]["publishAt"], datetime.datetime.now()) + self.assertFalse(sn[0]["read"]) + self.assertIsNone(sn[0]["readTime"]) diff --git a/tests/test_core_scheduler.py b/tests/test_core_scheduler.py index 44d5e33..443d193 100644 --- a/tests/test_core_scheduler.py +++ b/tests/test_core_scheduler.py @@ -108,7 +108,7 @@ def test_put_task(self): self.assertTrue(job.finished_return.startswith("test and print")) j = scheduler.get_job("test5") - self.assertIsInstance(j, scheduler.base.JobInfo) + self.assertIsInstance(j, scheduler.schedule.JobInfo) self.assertEqual("test and print '5'", j.finished_return) ji, code = scheduler.run_once_now(