Skip to content

Commit

Permalink
Version bumped to 0.23.4
Browse files Browse the repository at this point in the history
All updates handlers now are called asynchronously in background. It allows making another requests in updates handlers.

Added more debug logging around pending requests
  • Loading branch information
pylakey committed Jun 18, 2024
1 parent d6d10fb commit 6d91961
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 31 deletions.
2 changes: 1 addition & 1 deletion aiotdlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.23.3"
__version__ = "0.23.4"

from .client import Client
from .client_settings import ClientOptions
Expand Down
86 changes: 62 additions & 24 deletions aiotdlib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from .api import TextParseModeHTML
from .api import TextParseModeMarkdown
from .api import UpdateAuthorizationState
from .api import UpdateMessageSendFailed
from .api import UpdateMessageSendSucceeded
from .api import User
from .api import UserFullInfo
Expand Down Expand Up @@ -122,6 +123,7 @@ def __init__(self, settings: Optional[ClientSettings] = None):
self._middlewares: list[MiddlewareCallable] = []
self._middlewares_handlers: list[MiddlewareCallable] = []
self._update_task: typing.Optional[asyncio.Task[None]] = None
self._handlers_tasks: set[asyncio.Task] = set()
self.settings = settings or ClientSettings()
self.tdjson_client = TDJsonClient.create(self.settings.library_path)
self.logger = logging.getLogger(f"{self.__class__.__name__}_{self.tdjson_client.client_id}")
Expand Down Expand Up @@ -219,6 +221,11 @@ async def _call_handlers(self, update: TDLibObject):
# Running all handlers concurrently and independently
await asyncio.gather(*tasks, return_exceptions=True)

def _create_handler_task(self, coro):
task: asyncio.Task = asyncio.create_task(coro)
self._handlers_tasks.add(task)
task.add_done_callback(self._handlers_tasks.discard)

async def _handle_update(self, update: TDLibObject):
if len(self._middlewares_handlers) == 0:
return await self._call_handlers(update)
Expand All @@ -231,33 +238,45 @@ async def _fn(*_, **__):
for m in self._middlewares_handlers:
call_next = update_wrapper(partial(m, call_next=call_next), call_next)

return await call_next(self, update)
try:
return await call_next(self, update)
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.error(f'Unable to handle update {update}! {e}', exc_info=True)

async def _handle_pending_request(self, update: TDLibObject):
request_id = update.EXTRA.get('request_id')

if bool(request_id):
pending_request = self._pending_requests.get(request_id)

if bool(pending_request):
if isinstance(update, Message) and isinstance(update.sending_state, MessageSendingStatePending):
self._pending_messages[f"{update.chat_id}_{update.id}"] = update
else:
self._pending_requests.pop(request_id)
pending_request.set_update(update)
if isinstance(update, Message) and isinstance(update.sending_state, MessageSendingStatePending):
# MessageSendingStateFailed will be set as an error to pending request, no need to handle it here
sending_id = f"{update.chat_id}_{update.id}"
self.logger.info(f"Put message to pending messages: {sending_id}")
self._pending_messages[sending_id] = update
return

if isinstance(update, UpdateMessageSendSucceeded):
pending_message_key = f"{update.message.chat_id}_{update.old_message_id}"
pending_message = self._pending_messages.pop(pending_message_key, None)
if isinstance(update, (UpdateMessageSendSucceeded, UpdateMessageSendFailed)):
sending_id = f"{update.message.chat_id}_{update.old_message_id}"
pending_message = self._pending_messages.pop(sending_id, None)

if bool(pending_message):
request_id = pending_message.EXTRA.get('request_id')
pending_request = self._pending_requests.get(request_id)

if bool(pending_request):
update.message.EXTRA['request_id'] = request_id
self._pending_requests.pop(request_id)
pending_request.set_update(update.message)
if isinstance(update, UpdateMessageSendFailed):
self.logger.debug(f"Message %s sending failed", sending_id)
update = update.error
else:
self.logger.debug(f"Message %s sending succeeded", sending_id)
update = update.message

pending_request = self._pending_requests.pop(request_id, None)

if bool(pending_request):
pending_request.set_update(update)
self.logger.debug(
f"Pending request {request_id} is successfully processed."
f"Total pending: {len(self._pending_requests)}"
)

async def _updates_loop(self):
try:
Expand Down Expand Up @@ -292,12 +311,7 @@ async def _updates_loop(self):
except Exception as e:
self.logger.error(f'Unable to handle pending request {update}! {e}', exc_info=True)

try:
await self._handle_update(update)
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.error(f'Unable to handle update {update}! {e}', exc_info=True)
self._create_handler_task(self._handle_update(update))
except asyncio.CancelledError:
self._pending_requests.clear()
self._pending_messages.clear()
Expand Down Expand Up @@ -567,6 +581,20 @@ async def _cleanup(self):
except asyncio.CancelledError:
pass

# Cancel all background handlers tasks
if bool(self._handlers_tasks):
self.logger.info(f"Cancelling {len(self._handlers_tasks)} background handlers tasks")

for task in self._handlers_tasks:
# noinspection PyBroadException
try:
task.cancel()
except Exception:
pass

# Wait for all tasks to be cancelled
await asyncio.wait(self._handlers_tasks, return_when=asyncio.ALL_COMPLETED)

if bool(self.cache):
self.cache.clear()

Expand Down Expand Up @@ -609,11 +637,19 @@ async def request(
query.EXTRA['request_id'] = request_id
pending_request = PendingRequest(self, query)
self._pending_requests[request_id] = pending_request
self.logger.debug(
f"Pending request {query.ID} with request_id {request_id} created."
f"Total pending: {len(self._pending_requests)}"
)

try:
await self.send(query)
await pending_request.wait(raise_exc=True, timeout=request_timeout)
except (asyncio.TimeoutError, TimeoutError):
self.logger.debug(
f"Request {query.ID} with request_id {request_id} has been timed out. "
f"Total pending: {len(self._pending_requests)}"
)
self._pending_requests.pop(request_id, None)
raise
finally:
Expand Down Expand Up @@ -684,6 +720,8 @@ async def idle(self):
try:
while True:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
pass
finally:
self.logger.info('Stop Idling...')

Expand Down
8 changes: 3 additions & 5 deletions aiotdlib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,11 @@ def parse_tdlib_object(data: dict) -> TDLibObject:


class PendingRequest:
request: Optional[TDLibObject] = None
update: Optional[TDLibObject] = None
error: bool = False

def __init__(self, client: 'Client', request: TDLibObject) -> None:
self.client = client
self.request = request
self.request: Optional[TDLibObject] = request
self.update: Optional[TDLibObject] = None
self.error: bool = False
self._ready_event = asyncio.Event()

@property
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aiotdlib"
version = "0.23.3"
version = "0.23.4"
description = "Python asyncio Telegram client based on TDLib"
authors = ["pylakey <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit 6d91961

Please sign in to comment.