Skip to content

Commit

Permalink
updating black
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Apr 11, 2024
1 parent 6341a9c commit 2c28bdd
Show file tree
Hide file tree
Showing 20 changed files with 41 additions and 133 deletions.
1 change: 0 additions & 1 deletion docs/examples/basic_producers/producer_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


async def publish():

async with Producer("localhost", username="guest", password="guest") as producer:
# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)
Expand Down
2 changes: 0 additions & 2 deletions docs/examples/deduplication/producer_ded.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


async def publish():

async with Producer("localhost", username="guest", password="guest") as producer:
# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)
Expand All @@ -19,7 +18,6 @@ async def publish():

for j in range(LOOP_1):
for i in range(LOOP_2):

await producer.send(
STREAM,
# just 1000 messages will be inserted as messages with the same publishing_id and publisher_name will be discarded
Expand Down
1 change: 0 additions & 1 deletion docs/examples/filtering/producer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ async def filter_value_extractor(message: AMQPMessage) -> str:


async def publish():

async with Producer(
"localhost", username="guest", password="guest", filter_value_extractor=filter_value_extractor
) as producer:
Expand Down
36 changes: 13 additions & 23 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:
stream_name = rabbitmq_data["StreamName"]

if bool(rabbitmq_data["SuperStream"]) is False:

producer = Producer(
host=host,
username=username,
Expand All @@ -72,7 +71,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:
)

else:

producer = SuperStreamProducer( # type: ignore
host=host,
username=username,
Expand All @@ -90,7 +88,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:

# metadata and disconnection events for consumers
async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None:

print(
"connection has been closed from stream: "
+ str(on_closed_info.streams)
Expand Down Expand Up @@ -120,7 +117,6 @@ async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None:

# Make consumers
async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:

host = rabbitmq_data["Host"]
username = rabbitmq_data["Username"]
password = rabbitmq_data["Password"]
Expand All @@ -130,7 +126,6 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
stream_name = rabbitmq_data["StreamName"]

if bool(rabbitmq_data["SuperStream"]) is False:

consumer = Consumer(
host=host,
username=username,
Expand All @@ -142,7 +137,6 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
)

else:

consumer = SuperStreamConsumer( # type: ignore
host=host,
username=username,
Expand Down Expand Up @@ -181,7 +175,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):


async def publish(rabbitmq_configuration: dict):

global producer
global messages_per_producer

Expand Down Expand Up @@ -239,7 +232,6 @@ async def publish(rabbitmq_configuration: dict):


async def consume(rabbitmq_configuration: dict):

global consumer

is_super_stream_scenario = bool(rabbitmq_configuration["SuperStream"])
Expand Down Expand Up @@ -270,7 +262,6 @@ async def consume(rabbitmq_configuration: dict):


async def close(producer_task: asyncio.Task, consumer_task: asyncio.Task, printer_test_task: asyncio.Task):

global producer
global consumer

Expand All @@ -286,7 +277,6 @@ async def close(producer_task: asyncio.Task, consumer_task: asyncio.Task, printe


async def main():

loop = asyncio.get_event_loop()
loop.add_signal_handler(
signal.SIGINT, lambda: asyncio.create_task(close(producer_task, consumer_task, printer_test_task))
Expand All @@ -295,20 +285,20 @@ async def main():
configuration = await load_json_file("appsettings.json")
rabbitmq_configuration = configuration["RabbitMQ"]

# match not supported by mypy we need to fall back to if... else...
log_type = rabbitmq_configuration["Logging"]
match log_type:
case "":
logging.basicConfig(level=logging.INFO)
logging.getLogger("rstream").setLevel(logging.INFO)
case "info":
logging.basicConfig(level=logging.INFO)
logging.getLogger("rstream").setLevel(logging.INFO)
case "debug":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("rstream").setLevel(logging.DEBUG)
case "error":
logging.basicConfig(level=logging.ERROR)
logging.getLogger("rstream").setLevel(logging.ERROR)
if log_type == "":
logging.basicConfig(level=logging.INFO)
logging.getLogger("rstream").setLevel(logging.INFO)
elif log_type == "info":
logging.basicConfig(level=logging.INFO)
logging.getLogger("rstream").setLevel(logging.INFO)
elif log_type == "debug":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("rstream").setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.ERROR)
logging.getLogger("rstream").setLevel(logging.ERROR)

producer_task = None
consumer_task = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ async def consumer_update_handler_offset(is_active: bool, event_context: EventCo

async def consume():
try:

print("Starting Super Stream Consumer")
consumer = SuperStreamConsumer(
host="localhost",
Expand Down
55 changes: 27 additions & 28 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ isort = "^5.9.3"
mypy = "^0.910"
pytest = "^7.4.0"
pytest-asyncio = "^0.15.1"
black = "^24.0"
black = "^23.12.1"
requests = "^2.31.0"
mmh3 = "^4.0.0"

Expand Down
6 changes: 0 additions & 6 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,13 @@ async def start(self) -> None:
self.add_handler(schema.Close, self._on_close)

async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None:

if subscriber_name not in self._frames:
self.start_task(
f"run_delivery_handlers_{subscriber_name}",
self._run_delivery_handlers(subscriber_name, handler),
)

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):

while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
try:
Expand Down Expand Up @@ -336,7 +334,6 @@ async def close(self) -> None:
connection_is_broken = True

if self._conn is not None and self.is_connection_alive():

if self.is_started:
try:
await asyncio.wait_for(
Expand Down Expand Up @@ -481,7 +478,6 @@ async def query_leader_and_replicas(
self,
stream: str,
) -> tuple[schema.Broker, list[schema.Broker]]:

while True:
metadata_resp = await self.sync_request(
schema.Metadata(
Expand Down Expand Up @@ -617,7 +613,6 @@ async def partitions(self, super_stream: str) -> list[str]:
return resp.streams

async def consumer_update(self, correlation_id: int, offset_specification: OffsetSpecification) -> None:

await self.send_frame(
schema.ConsumerUpdateServerResponse(
correlation_id=correlation_id,
Expand All @@ -641,7 +636,6 @@ async def route(self, routing_key: str, super_stream: str) -> list[str]:
async def exchange_command_version(
self, command_info: schema.FrameHandlerInfo
) -> schema.FrameHandlerInfo:

command_versions_input = []
command_versions_input.append(command_info)
resp = await self.sync_request(
Expand Down
2 changes: 0 additions & 2 deletions rstream/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class GzipCompressionCodec(ICompressionCodec):
buffer: bytes = bytes()

def compress(self, messages: list[MessageT]):

uncompressed_data = bytes()
for item in messages:
msg = RawMessage(item) if isinstance(item, bytes) else item
Expand All @@ -114,7 +113,6 @@ def compress(self, messages: list[MessageT]):
self.compressed_data_size = len(self.buffer)

def uncompress(self, compressed_data: bytes, uncompressed_data_size: int) -> bytes:

uncompressed_data = gzip.decompress(compressed_data)

if len(uncompressed_data) != uncompressed_data_size:
Expand Down
9 changes: 0 additions & 9 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ async def _create_subscriber(
offset_type: OffsetType,
offset: Optional[int],
) -> _Subscriber:

logger.debug("_create_subscriber(): Create subscriber")
client = await self._get_or_create_client(stream)

Expand Down Expand Up @@ -219,7 +218,6 @@ async def subscribe(
consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None,
filter_input: Optional[FilterConfiguration] = None,
) -> str:

logger.debug("Consumer subscribe()")
if offset_specification is None:
offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None)
Expand Down Expand Up @@ -368,7 +366,6 @@ def _filter_messages(
async def _on_deliver(
self, frame: schema.Deliver, subscriber: _Subscriber, filter_value: Optional[FilterConfiguration]
) -> None:

if frame.subscription_id != subscriber.subscription_id:
return

Expand All @@ -382,7 +379,6 @@ async def _on_deliver(
await maybe_coro

async def _on_metadata_update(self, frame: schema.MetadataUpdate) -> None:

logger.debug("_on_metadata_update: On metadata update event triggered on producer")
if frame.metadata_info.stream not in self._clients:
return
Expand All @@ -401,7 +397,6 @@ async def _on_consumer_update_query_response(
reference: str,
consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None,
) -> None:

# event the consumer is not active, we need to send a ConsumerUpdateResponse
# by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default
if consumer_update_listener is None:
Expand Down Expand Up @@ -465,7 +460,6 @@ def get_stream(self, subscriber_name) -> str:
return self._subscribers[subscriber_name].stream

async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> None:

logging.debug("reconnect_stream")
curr_subscriber = None
curr_subscriber_id = None
Expand Down Expand Up @@ -505,7 +499,6 @@ async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> N
)

async def _check_if_filtering_is_supported(self) -> None:

command_version_input = schema.FrameHandlerInfo(Key.Publish.value, min_version=1, max_version=2)
server_command_version: schema.FrameHandlerInfo = await (
await self.default_client
Expand All @@ -526,13 +519,11 @@ async def _create_locator_connection(self) -> Client:
)

async def _close_locator_connection(self):

if await (await self.default_client).get_stream_count() == 0:
await (await self.default_client).close()
self._default_client = None

async def _maybe_clean_up_during_lost_connection(self, stream: str):

curr_subscriber = None

for subscriber_id in self._subscribers:
Expand Down
Loading

0 comments on commit 2c28bdd

Please sign in to comment.