Skip to content

Commit

Permalink
mypy: Add annotation for new version of mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Mar 21, 2024
1 parent 66d663c commit 7bcfdfc
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions yapw/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
"""
from __future__ import annotations

import functools
import logging
import signal
import threading
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from functools import partial
from typing import TYPE_CHECKING, Any, Generic, Never, TypeVar

import pika
from pika.adapters.asyncio_connection import AsyncioConnection
Expand All @@ -26,6 +26,7 @@
from yapw.util import basic_publish_debug_args, basic_publish_kwargs, default_decode, default_encode

if TYPE_CHECKING:
from asyncio import Future
from collections.abc import Callable
from types import FrameType

Expand Down Expand Up @@ -274,9 +275,7 @@ def consume(
self.channel.add_on_cancel_callback(self.channel_cancel_callback)

self.executor = ThreadPoolExecutor(thread_name_prefix=f"yapw-{queue}")
cb = functools.partial(
_on_message, args=(self.executor.submit, decorator, self.decode, on_message_callback, self.state)
)
cb = partial(_on_message, args=(self.executor.submit, decorator, self.decode, on_message_callback, self.state))

self.consumer_tag = self.channel.basic_consume(queue_name, cb)
logger.debug("Consuming messages on channel %s from queue %s", self.channel.channel_number, queue_name)
Expand Down Expand Up @@ -460,7 +459,7 @@ def add_signal_handler(self, signalnum: int, handler: Callable[..., object]) ->
"""
Add a handler for a signal.
"""
self.connection.ioloop.add_signal_handler(signalnum, functools.partial(handler, signalnum=signalnum))
self.connection.ioloop.add_signal_handler(signalnum, partial(handler, signalnum=signalnum))

def _on_signal_callback(self, signalnum: int) -> None:
if not self.stopping: # remove_signal_handler() is too slow
Expand Down Expand Up @@ -597,7 +596,7 @@ def thread_name_infix(self) -> str:
def exchange_ready(self) -> None:
"""Declare the queue, once the exchange is declared."""
queue_name = self.format_routing_key(self.queue)
cb = functools.partial(self.queue_declareok_callback, queue_name=queue_name)
cb = partial(self.queue_declareok_callback, queue_name=queue_name)
self.channel.queue_declare(queue=queue_name, durable=self.durable, arguments=self.arguments, callback=cb)

def queue_declareok_callback(self, method: pika.frame.Method[pika.spec.Queue.DeclareOk], queue_name: str) -> None:
Expand All @@ -615,7 +614,7 @@ def queue_bindok_callback(

def _bind_queue(self, queue_name: str, index: int) -> None:
routing_key = self.format_routing_key(self.routing_keys[index])
cb = functools.partial(self.queue_bindok_callback, queue_name=queue_name, index=index + 1)
cb = partial(self.queue_bindok_callback, queue_name=queue_name, index=index + 1)
self.channel.queue_bind(queue=queue_name, exchange=self.exchange, routing_key=routing_key, callback=cb)

def consume(self, on_message_callback: ConsumerCallback, decorator: Decorator, queue_name: str) -> None:
Expand All @@ -631,8 +630,8 @@ def consume(self, on_message_callback: ConsumerCallback, decorator: Decorator, q
"""
self.channel.add_on_cancel_callback(self.channel_cancel_callback)

submit = functools.partial(self.connection.ioloop.run_in_executor, self.executor)
cb = functools.partial(_on_message, args=(submit, decorator, self.decode, on_message_callback, self.state))
submit: partial[Future[Never]] = partial(self.connection.ioloop.run_in_executor, self.executor)
cb = partial(_on_message, args=(submit, decorator, self.decode, on_message_callback, self.state))

self.consumer_tag = self.channel.basic_consume(queue_name, cb, callback=self.channel_consumeok_callback)
logger.debug("Consuming messages on channel %s from queue %s", self.channel.channel_number, queue_name)
Expand Down

0 comments on commit 7bcfdfc

Please sign in to comment.