Skip to content

Commit

Permalink
feature(app):
Browse files Browse the repository at this point in the history
- fix unittest
- add get system notices
  • Loading branch information
MorvanZhou committed May 16, 2024
1 parent cff679d commit ca5e35c
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 220 deletions.
19 changes: 19 additions & 0 deletions src/retk/controllers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,22 @@ async def post_in_manager_delivery(
return schemas.RequestIdResponse(
requestId=au.request_id,
)


async def get_system_notices(
au: AuthedUser,
page: int,
limit: int,
) -> schemas.manager.GetSystemNoticesResponse:
notices, total = await notice.get_system_notices(
page=page,
limit=limit,
)
for n in notices:
n["id"] = str(n["_id"])
del n["_id"]
return schemas.manager.GetSystemNoticesResponse(
requestId=au.request_id,
notices=notices,
total=total,
)
17 changes: 17 additions & 0 deletions src/retk/controllers/schemas/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,20 @@ class ManagerNoticeDeliveryRequest(BaseModel):
description="list of user ID if the recipientType is batch"
)
publishAt: datetime = Field(..., description="publish time")


class GetSystemNoticesResponse(BaseModel):
class Notice(BaseModel):
id: str = Field(description="notice ID")
title: str = Field(description="title")
content: str = Field(description="content")
recipientType: notice.RecipientTypeEnum = Field(description="recipient type")
batchTypeIds: List[str] = Field(description="list of user ID if the recipientType is batch")
publishAt: datetime = Field(description="publish time")
scheduled: bool = Field(description="scheduled")

requestId: str = Field(max_length=settings.REQUEST_ID_MAX_LENGTH, description="request ID")
notices: List[Notice] = Field(
description="list of notices"
)
total: int = Field(description="total number of notices")
118 changes: 16 additions & 102 deletions src/retk/core/notice.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from datetime import datetime
from typing import List, Dict, Optional, Tuple

Expand All @@ -22,6 +21,13 @@ async def post_in_manager_delivery(
return None, const.CodeEnum.NOT_PERMITTED
if publish_at is None:
publish_at = datetime.now(tz=utc)

# if publish_at is not utc, convert it to utc
if publish_at.tzinfo is None or publish_at.tzinfo.utcoffset(publish_at) is None:
publish_at = publish_at.replace(tzinfo=utc)
elif publish_at.tzinfo != utc:
publish_at = publish_at.astimezone(utc)

# add system notice
notice: NoticeManagerDelivery = {
"_id": ObjectId(),
Expand All @@ -40,107 +46,15 @@ async def post_in_manager_delivery(
return notice, const.CodeEnum.OK


async def __get_users_in_batches(batch_size=100):
# Get the total number of users
total_users = await client.coll.users.count_documents({})
if config.is_local_db():
fn = client.coll.users.find(
{},
).sort(
[("_id", -1)]
)
else:
fn = client.coll.users.find(
{}, projection=["id"]
).sort(
[("_id", -1)]
)
for i in range(0, total_users, batch_size):
# Sort by _id in descending order and limit the result
users = await fn.skip(i).limit(batch_size).to_list(None)
yield users


async def __deliver_scheduled_system_notices_batch(
users: List[Dict[str, str]],
sender_id: str,
notice_id: ObjectId
):
notices = [{
"_id": ObjectId(),
"senderId": sender_id,
"recipientId": user["id"],
"noticeId": notice_id,
"read": False,
"readTime": None,
} for user in users]
# Insert all notices at once
await client.coll.notice_system.insert_many(notices)


def deliver_unscheduled_system_notices():
async def _deliver_unscheduled_system_notices():
unscheduled = await client.coll.notice_manager_delivery.find({
"scheduled": False,
}).to_list(None)
for notice in unscheduled:
notice_id = notice["_id"]
sender_id = notice["senderId"]
recipient_type = notice["recipientType"]
if notice["publishAt"] <= datetime.now(tz=utc):
# send notice
if recipient_type == const.notice.RecipientTypeEnum.ALL.value:
async for users in __get_users_in_batches(batch_size=100):
await __deliver_scheduled_system_notices_batch(
users=users,
sender_id=sender_id,
notice_id=notice_id
)
elif recipient_type == const.notice.RecipientTypeEnum.BATCH.value:
batch_type_ids = notice["batchTypeIds"]
# Create a list of notices
notices = [{
"_id": ObjectId(),
"senderId": sender_id,
"recipientId": user_id,
"noticeId": notice_id,
"read": False,
"readTime": None,
} for user_id in batch_type_ids]
# Insert all notices at once
await client.coll.notice_system.insert_many(notices)
elif recipient_type == const.notice.RecipientTypeEnum.ADMIN.value:
# Get all admins
admins = await client.coll.users.find(
{"type": const.USER_TYPE.ADMIN.value}, {"id", 1}).to_list(None)
await __deliver_scheduled_system_notices_batch(
users=admins,
sender_id=sender_id,
notice_id=notice_id
)
elif recipient_type == const.notice.RecipientTypeEnum.MANAGER.value:
# Get all managers
managers = await client.coll.users.find(
{"type": const.USER_TYPE.MANAGER.value}, {"id", 1}).to_list(None)
await __deliver_scheduled_system_notices_batch(
users=managers,
sender_id=sender_id,
notice_id=notice_id
)
else:
raise ValueError(f"Unknown recipient type: {recipient_type}")

# Update the notice to indicate that it has been scheduled
await client.coll.notice_manager_delivery.update_one(
{"_id": notice_id},
{"$set": {"scheduled": True}}
)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
res = loop.run_until_complete(_deliver_unscheduled_system_notices())
loop.close()
return res
async def get_system_notices(
page: int,
limit: int,
) -> Tuple[List[NoticeManagerDelivery], int]:
total = await client.coll.notice_manager_delivery.count_documents({})
notices = await client.coll.notice_manager_delivery.find(
{}
).skip(page * limit).limit(limit=limit).to_list(None)
return notices, total


async def get_user_notices(au: AuthedUser) -> Tuple[Dict[str, List[Dict]], const.CodeEnum]:
Expand Down
2 changes: 1 addition & 1 deletion src/retk/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from . import tasks
from . import timing
from .base import ( # noqa: F401
from .schedule import ( # noqa: F401
init_tasks,
start,
stop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from retk import const
from retk.const.settings import MAX_SCHEDULE_JOB_INFO_LEN
from . import tasks

"""
- BlockingScheduler:
Expand Down Expand Up @@ -88,11 +89,11 @@ def wrapper(*args, **kwargs):

def init_tasks():
# check unscheduled system notices every minute
# run_every_at(
# job_id="deliver_unscheduled_system_notices",
# func=deliver_unscheduled_system_notices,
# second=0,
# )
run_every_at(
job_id="deliver_unscheduled_system_notices",
func=tasks.notice.deliver_unscheduled_system_notices,
second=0,
)
return


Expand Down
1 change: 1 addition & 0 deletions src/retk/core/scheduler/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import ( # noqa: F401
email,
notice,
)
120 changes: 120 additions & 0 deletions src/retk/core/scheduler/tasks/notice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
from datetime import datetime
from typing import List, Dict

from bson import ObjectId
from bson.tz_util import utc

from retk import const, config
from retk.models.client import init_mongo


async def __get_users_in_batches(db, batch_size=100):
# Get the total number of users
total_users = await db["users"].count_documents({})
if config.is_local_db():
fn = db["users"].find(
{},
).sort(
[("_id", -1)]
)
else:
fn = db["users"].find(
{}, projection=["id"]
).sort(
[("_id", -1)]
)
for i in range(0, total_users, batch_size):
# Sort by _id in descending order and limit the result
users = await fn.skip(i).limit(batch_size).to_list(None)
yield users


async def __deliver_scheduled_system_notices_batch(
db,
users: List[Dict[str, str]],
sender_id: str,
notice_id: ObjectId
):
notices = [{
"_id": ObjectId(),
"senderId": sender_id,
"recipientId": user["id"],
"noticeId": notice_id,
"read": False,
"readTime": None,
} for user in users]
# Insert all notices at once
await db["noticeSystem"].insert_many(notices)


def deliver_unscheduled_system_notices():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
res = loop.run_until_complete(async_deliver_unscheduled_system_notices())
loop.close()
return res


async def async_deliver_unscheduled_system_notices():
_, db = init_mongo(connection_timeout=5)
unscheduled = await db["noticeManagerDelivery"].find({
"scheduled": False,
}).to_list(None)
for notice in unscheduled:
notice_id = notice["_id"]
sender_id = notice["senderId"]
recipient_type = notice["recipientType"]
# notice["publishAt"] is utc time but the tzinfo is not set, so we need to set it
if notice["publishAt"].astimezone(utc) <= datetime.now(tz=utc):
# send notice
if recipient_type == const.notice.RecipientTypeEnum.ALL.value:
async for users in __get_users_in_batches(db, batch_size=100):
await __deliver_scheduled_system_notices_batch(
db=db,
users=users,
sender_id=sender_id,
notice_id=notice_id
)
elif recipient_type == const.notice.RecipientTypeEnum.BATCH.value:
batch_type_ids = notice["batchTypeIds"]
# Create a list of notices
notices = [{
"_id": ObjectId(),
"senderId": sender_id,
"recipientId": user_id,
"noticeId": notice_id,
"read": False,
"readTime": None,
} for user_id in batch_type_ids]
# Insert all notices at once
await db["noticeSystem"].insert_many(notices)
elif recipient_type == const.notice.RecipientTypeEnum.ADMIN.value:
# Get all admins
admins = await db["users"].find(
{"type": const.USER_TYPE.ADMIN.value}, {"id", 1}).to_list(None)
await __deliver_scheduled_system_notices_batch(
db=db,
users=admins,
sender_id=sender_id,
notice_id=notice_id
)
elif recipient_type == const.notice.RecipientTypeEnum.MANAGER.value:
# Get all managers
managers = await db["users"].find(
{"type": const.USER_TYPE.MANAGER.value}, {"id", 1}).to_list(None)
await __deliver_scheduled_system_notices_batch(
db=db,
users=managers,
sender_id=sender_id,
notice_id=notice_id
)
else:
raise ValueError(f"Unknown recipient type: {recipient_type}")

# Update the notice to indicate that it has been scheduled
await db["noticeManagerDelivery"].update_one(
{"_id": notice_id},
{"$set": {"scheduled": True}}
)
return
39 changes: 22 additions & 17 deletions src/retk/models/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@
pass


def init_mongo(connection_timeout: int):
conf = config.get_settings()
if config.is_local_db():
if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():
raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}")
db_path = conf.RETHINK_LOCAL_STORAGE_PATH / ".data" / "db"
db_path.mkdir(parents=True, exist_ok=True)
mongo = MongitaClientDisk(db_path)
else:
mongo = AsyncIOMotorClient(
host=conf.DB_HOST,
port=conf.DB_PORT,
username=conf.DB_USER,
password=conf.DB_PASSWORD,
socketTimeoutMS=1000 * connection_timeout,
)

db = mongo[config.get_settings().DB_NAME]
return mongo, db


class Client:
coll: Collections = Collections()
mongo: Optional[Union["AsyncIOMotorClient", MongitaClientDisk]] = None
Expand Down Expand Up @@ -51,23 +72,7 @@ async def init(self):
await self.try_restore_search()

def init_mongo(self):
conf = config.get_settings()
if config.is_local_db():
if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():
raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}")
db_path = conf.RETHINK_LOCAL_STORAGE_PATH / ".data" / "db"
db_path.mkdir(parents=True, exist_ok=True)
self.mongo = MongitaClientDisk(db_path)
else:
self.mongo = AsyncIOMotorClient(
host=conf.DB_HOST,
port=conf.DB_PORT,
username=conf.DB_USER,
password=conf.DB_PASSWORD,
socketTimeoutMS=1000 * self.connection_timeout,
)

db = self.mongo[config.get_settings().DB_NAME]
self.mongo, db = init_mongo(self.connection_timeout)
self.coll.users = db["users"]
self.coll.nodes = db["nodes"]
self.coll.import_data = db["importData"]
Expand Down
Loading

0 comments on commit ca5e35c

Please sign in to comment.