From a0f4de128b4178eeba1358513e677e0cb1fd73bc Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Thu, 11 Apr 2024 16:45:31 +0200 Subject: [PATCH] updating black --- .../examples/basic_producers/producer_send.py | 1 - docs/examples/deduplication/producer_ded.py | 2 - docs/examples/filtering/producer_filtering.py | 1 - .../reliable_client/BestPracticesClient.py | 10 -- .../single_active_consumer.py | 1 - poetry.lock | 133 +++++++++--------- pyproject.toml | 4 +- rstream/client.py | 6 - rstream/compression.py | 2 - rstream/consumer.py | 9 -- rstream/producer.py | 23 --- rstream/superstream.py | 1 - rstream/superstream_consumer.py | 3 - rstream/superstream_producer.py | 1 - tests/conftest.py | 3 - tests/http_requests.py | 1 - tests/test_client.py | 1 - tests/test_consumer.py | 8 -- tests/test_producer.py | 7 - tests/util.py | 11 -- 20 files changed, 66 insertions(+), 162 deletions(-) diff --git a/docs/examples/basic_producers/producer_send.py b/docs/examples/basic_producers/producer_send.py index 2696791..f2b8bdb 100644 --- a/docs/examples/basic_producers/producer_send.py +++ b/docs/examples/basic_producers/producer_send.py @@ -8,7 +8,6 @@ async def publish(): - async with Producer("localhost", username="guest", password="guest") as producer: # create a stream if it doesn't already exist await producer.create_stream(STREAM, exists_ok=True) diff --git a/docs/examples/deduplication/producer_ded.py b/docs/examples/deduplication/producer_ded.py index e296ee8..5e2008d 100644 --- a/docs/examples/deduplication/producer_ded.py +++ b/docs/examples/deduplication/producer_ded.py @@ -9,7 +9,6 @@ async def publish(): - async with Producer("localhost", username="guest", password="guest") as producer: # create a stream if it doesn't already exist await producer.create_stream(STREAM, exists_ok=True) @@ -19,7 +18,6 @@ async def publish(): for j in range(LOOP_1): for i in range(LOOP_2): - await producer.send( STREAM, # just 1000 messages will be inserted as messages with the same publishing_id and publisher_name will be discarded diff --git a/docs/examples/filtering/producer_filtering.py b/docs/examples/filtering/producer_filtering.py index b7f9b29..1e21991 100644 --- a/docs/examples/filtering/producer_filtering.py +++ b/docs/examples/filtering/producer_filtering.py @@ -12,7 +12,6 @@ async def filter_value_extractor(message: AMQPMessage) -> str: async def publish(): - async with Producer( "localhost", username="guest", password="guest", filter_value_extractor=filter_value_extractor ) as producer: diff --git a/docs/examples/reliable_client/BestPracticesClient.py b/docs/examples/reliable_client/BestPracticesClient.py index 2798dda..0df0275 100644 --- a/docs/examples/reliable_client/BestPracticesClient.py +++ b/docs/examples/reliable_client/BestPracticesClient.py @@ -61,7 +61,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: stream_name = rabbitmq_data["StreamName"] if bool(rabbitmq_data["SuperStream"]) is False: - producer = Producer( host=host, username=username, @@ -72,7 +71,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: ) else: - producer = SuperStreamProducer( # type: ignore host=host, username=username, @@ -90,7 +88,6 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: # metadata and disconnection events for consumers async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None: - print( "connection has been closed from stream: " + str(on_closed_info.streams) @@ -120,7 +117,6 @@ async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None: # Make consumers async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: - host = rabbitmq_data["Host"] username = rabbitmq_data["Username"] password = rabbitmq_data["Password"] @@ -130,7 +126,6 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: stream_name = rabbitmq_data["StreamName"] if bool(rabbitmq_data["SuperStream"]) is False: - consumer = Consumer( host=host, username=username, @@ -142,7 +137,6 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: ) else: - consumer = SuperStreamConsumer( # type: ignore host=host, username=username, @@ -181,7 +175,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): async def publish(rabbitmq_configuration: dict): - global producer global messages_per_producer @@ -239,7 +232,6 @@ async def publish(rabbitmq_configuration: dict): async def consume(rabbitmq_configuration: dict): - global consumer is_super_stream_scenario = bool(rabbitmq_configuration["SuperStream"]) @@ -270,7 +262,6 @@ async def consume(rabbitmq_configuration: dict): async def close(producer_task: asyncio.Task, consumer_task: asyncio.Task, printer_test_task: asyncio.Task): - global producer global consumer @@ -286,7 +277,6 @@ async def close(producer_task: asyncio.Task, consumer_task: asyncio.Task, printe async def main(): - loop = asyncio.get_event_loop() loop.add_signal_handler( signal.SIGINT, lambda: asyncio.create_task(close(producer_task, consumer_task, printer_test_task)) diff --git a/docs/examples/single_active_consumer/single_active_consumer.py b/docs/examples/single_active_consumer/single_active_consumer.py index a2fa96c..9e3302f 100644 --- a/docs/examples/single_active_consumer/single_active_consumer.py +++ b/docs/examples/single_active_consumer/single_active_consumer.py @@ -46,7 +46,6 @@ async def consumer_update_handler_offset(is_active: bool, event_context: EventCo async def consume(): try: - print("Starting Super Stream Consumer") consumer = SuperStreamConsumer( host="localhost", diff --git a/poetry.lock b/poetry.lock index 89b75bc..3b5b6ee 100644 --- a/poetry.lock +++ b/poetry.lock @@ -24,33 +24,33 @@ files = [ [[package]] name = "black" -version = "24.3.0" +version = "23.12.1" description = "The uncompromising code formatter." optional = false python-versions = ">=3.8" files = [ - {file = "black-24.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7d5e026f8da0322b5662fa7a8e752b3fa2dac1c1cbc213c3d7ff9bdd0ab12395"}, - {file = "black-24.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9f50ea1132e2189d8dff0115ab75b65590a3e97de1e143795adb4ce317934995"}, - {file = "black-24.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2af80566f43c85f5797365077fb64a393861a3730bd110971ab7a0c94e873e7"}, - {file = "black-24.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:4be5bb28e090456adfc1255e03967fb67ca846a03be7aadf6249096100ee32d0"}, - {file = "black-24.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4f1373a7808a8f135b774039f61d59e4be7eb56b2513d3d2f02a8b9365b8a8a9"}, - {file = "black-24.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:aadf7a02d947936ee418777e0247ea114f78aff0d0959461057cae8a04f20597"}, - {file = "black-24.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65c02e4ea2ae09d16314d30912a58ada9a5c4fdfedf9512d23326128ac08ac3d"}, - {file = "black-24.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:bf21b7b230718a5f08bd32d5e4f1db7fc8788345c8aea1d155fc17852b3410f5"}, - {file = "black-24.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:2818cf72dfd5d289e48f37ccfa08b460bf469e67fb7c4abb07edc2e9f16fb63f"}, - {file = "black-24.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4acf672def7eb1725f41f38bf6bf425c8237248bb0804faa3965c036f7672d11"}, - {file = "black-24.3.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7ed6668cbbfcd231fa0dc1b137d3e40c04c7f786e626b405c62bcd5db5857e4"}, - {file = "black-24.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:56f52cfbd3dabe2798d76dbdd299faa046a901041faf2cf33288bc4e6dae57b5"}, - {file = "black-24.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:79dcf34b33e38ed1b17434693763301d7ccbd1c5860674a8f871bd15139e7837"}, - {file = "black-24.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e19cb1c6365fd6dc38a6eae2dcb691d7d83935c10215aef8e6c38edee3f77abd"}, - {file = "black-24.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65b76c275e4c1c5ce6e9870911384bff5ca31ab63d19c76811cb1fb162678213"}, - {file = "black-24.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:b5991d523eee14756f3c8d5df5231550ae8993e2286b8014e2fdea7156ed0959"}, - {file = "black-24.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c45f8dff244b3c431b36e3224b6be4a127c6aca780853574c00faf99258041eb"}, - {file = "black-24.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6905238a754ceb7788a73f02b45637d820b2f5478b20fec82ea865e4f5d4d9f7"}, - {file = "black-24.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7de8d330763c66663661a1ffd432274a2f92f07feeddd89ffd085b5744f85e7"}, - {file = "black-24.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:7bb041dca0d784697af4646d3b62ba4a6b028276ae878e53f6b4f74ddd6db99f"}, - {file = "black-24.3.0-py3-none-any.whl", hash = "sha256:41622020d7120e01d377f74249e677039d20e6344ff5851de8a10f11f513bf93"}, - {file = "black-24.3.0.tar.gz", hash = "sha256:a0c9c4a0771afc6919578cec71ce82a3e31e054904e7197deacbc9382671c41f"}, + {file = "black-23.12.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e0aaf6041986767a5e0ce663c7a2f0e9eaf21e6ff87a5f95cbf3675bfd4c41d2"}, + {file = "black-23.12.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c88b3711d12905b74206227109272673edce0cb29f27e1385f33b0163c414bba"}, + {file = "black-23.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a920b569dc6b3472513ba6ddea21f440d4b4c699494d2e972a1753cdc25df7b0"}, + {file = "black-23.12.1-cp310-cp310-win_amd64.whl", hash = "sha256:3fa4be75ef2a6b96ea8d92b1587dd8cb3a35c7e3d51f0738ced0781c3aa3a5a3"}, + {file = "black-23.12.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:8d4df77958a622f9b5a4c96edb4b8c0034f8434032ab11077ec6c56ae9f384ba"}, + {file = "black-23.12.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:602cfb1196dc692424c70b6507593a2b29aac0547c1be9a1d1365f0d964c353b"}, + {file = "black-23.12.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9c4352800f14be5b4864016882cdba10755bd50805c95f728011bcb47a4afd59"}, + {file = "black-23.12.1-cp311-cp311-win_amd64.whl", hash = "sha256:0808494f2b2df923ffc5723ed3c7b096bd76341f6213989759287611e9837d50"}, + {file = "black-23.12.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:25e57fd232a6d6ff3f4478a6fd0580838e47c93c83eaf1ccc92d4faf27112c4e"}, + {file = "black-23.12.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2d9e13db441c509a3763a7a3d9a49ccc1b4e974a47be4e08ade2a228876500ec"}, + {file = "black-23.12.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d1bd9c210f8b109b1762ec9fd36592fdd528485aadb3f5849b2740ef17e674e"}, + {file = "black-23.12.1-cp312-cp312-win_amd64.whl", hash = "sha256:ae76c22bde5cbb6bfd211ec343ded2163bba7883c7bc77f6b756a1049436fbb9"}, + {file = "black-23.12.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1fa88a0f74e50e4487477bc0bb900c6781dbddfdfa32691e780bf854c3b4a47f"}, + {file = "black-23.12.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a4d6a9668e45ad99d2f8ec70d5c8c04ef4f32f648ef39048d010b0689832ec6d"}, + {file = "black-23.12.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b18fb2ae6c4bb63eebe5be6bd869ba2f14fd0259bda7d18a46b764d8fb86298a"}, + {file = "black-23.12.1-cp38-cp38-win_amd64.whl", hash = "sha256:c04b6d9d20e9c13f43eee8ea87d44156b8505ca8a3c878773f68b4e4812a421e"}, + {file = "black-23.12.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3e1b38b3135fd4c025c28c55ddfc236b05af657828a8a6abe5deec419a0b7055"}, + {file = "black-23.12.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4f0031eaa7b921db76decd73636ef3a12c942ed367d8c3841a0739412b260a54"}, + {file = "black-23.12.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:97e56155c6b737854e60a9ab1c598ff2533d57e7506d97af5481141671abf3ea"}, + {file = "black-23.12.1-cp39-cp39-win_amd64.whl", hash = "sha256:dd15245c8b68fe2b6bd0f32c1556509d11bb33aec9b5d0866dd8e2ed3dba09c2"}, + {file = "black-23.12.1-py3-none-any.whl", hash = "sha256:78baad24af0f033958cad29731e27363183e140962595def56423e626f4bee3e"}, + {file = "black-23.12.1.tar.gz", hash = "sha256:4ce3ef14ebe8d9509188014d96af1c456a910d5b5cbf434a09fef7e024b3d0d5"}, ] [package.dependencies] @@ -436,54 +436,60 @@ test = ["mypy (>=1.0)", "pytest (>=7.0.0)"] [[package]] name = "mypy" -version = "0.910" +version = "1.9.0" description = "Optional static typing for Python" optional = false -python-versions = ">=3.5" +python-versions = ">=3.8" files = [ - {file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"}, - {file = "mypy-0.910-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb"}, - {file = "mypy-0.910-cp35-cp35m-manylinux2010_x86_64.whl", hash = "sha256:088cd9c7904b4ad80bec811053272986611b84221835e079be5bcad029e79dd9"}, - {file = "mypy-0.910-cp35-cp35m-win_amd64.whl", hash = "sha256:adaeee09bfde366d2c13fe6093a7df5df83c9a2ba98638c7d76b010694db760e"}, - {file = "mypy-0.910-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:ecd2c3fe726758037234c93df7e98deb257fd15c24c9180dacf1ef829da5f921"}, - {file = "mypy-0.910-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:d9dd839eb0dc1bbe866a288ba3c1afc33a202015d2ad83b31e875b5905a079b6"}, - {file = "mypy-0.910-cp36-cp36m-manylinux2010_x86_64.whl", hash = "sha256:3e382b29f8e0ccf19a2df2b29a167591245df90c0b5a2542249873b5c1d78212"}, - {file = "mypy-0.910-cp36-cp36m-win_amd64.whl", hash = "sha256:53fd2eb27a8ee2892614370896956af2ff61254c275aaee4c230ae771cadd885"}, - {file = "mypy-0.910-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b6fb13123aeef4a3abbcfd7e71773ff3ff1526a7d3dc538f3929a49b42be03f0"}, - {file = "mypy-0.910-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:e4dab234478e3bd3ce83bac4193b2ecd9cf94e720ddd95ce69840273bf44f6de"}, - {file = "mypy-0.910-cp37-cp37m-manylinux2010_x86_64.whl", hash = "sha256:7df1ead20c81371ccd6091fa3e2878559b5c4d4caadaf1a484cf88d93ca06703"}, - {file = "mypy-0.910-cp37-cp37m-win_amd64.whl", hash = "sha256:0aadfb2d3935988ec3815952e44058a3100499f5be5b28c34ac9d79f002a4a9a"}, - {file = "mypy-0.910-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ec4e0cd079db280b6bdabdc807047ff3e199f334050db5cbb91ba3e959a67504"}, - {file = "mypy-0.910-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:119bed3832d961f3a880787bf621634ba042cb8dc850a7429f643508eeac97b9"}, - {file = "mypy-0.910-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:866c41f28cee548475f146aa4d39a51cf3b6a84246969f3759cb3e9c742fc072"}, - {file = "mypy-0.910-cp38-cp38-win_amd64.whl", hash = "sha256:ceb6e0a6e27fb364fb3853389607cf7eb3a126ad335790fa1e14ed02fba50811"}, - {file = "mypy-0.910-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:1a85e280d4d217150ce8cb1a6dddffd14e753a4e0c3cf90baabb32cefa41b59e"}, - {file = "mypy-0.910-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:42c266ced41b65ed40a282c575705325fa7991af370036d3f134518336636f5b"}, - {file = "mypy-0.910-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:3c4b8ca36877fc75339253721f69603a9c7fdb5d4d5a95a1a1b899d8b86a4de2"}, - {file = "mypy-0.910-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:c0df2d30ed496a08de5daed2a9ea807d07c21ae0ab23acf541ab88c24b26ab97"}, - {file = "mypy-0.910-cp39-cp39-win_amd64.whl", hash = "sha256:c6c2602dffb74867498f86e6129fd52a2770c48b7cd3ece77ada4fa38f94eba8"}, - {file = "mypy-0.910-py3-none-any.whl", hash = "sha256:ef565033fa5a958e62796867b1df10c40263ea9ded87164d67572834e57a174d"}, - {file = "mypy-0.910.tar.gz", hash = "sha256:704098302473cb31a218f1775a873b376b30b4c18229421e9e9dc8916fd16150"}, + {file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"}, + {file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"}, + {file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"}, + {file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"}, + {file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"}, + {file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"}, + {file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"}, + {file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"}, + {file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"}, + {file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"}, + {file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"}, + {file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"}, + {file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"}, + {file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"}, + {file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"}, + {file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"}, + {file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"}, + {file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"}, + {file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"}, + {file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"}, + {file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"}, + {file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"}, + {file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"}, + {file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"}, + {file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"}, + {file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"}, + {file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"}, ] [package.dependencies] -mypy-extensions = ">=0.4.3,<0.5.0" -toml = "*" -typing-extensions = ">=3.7.4" +mypy-extensions = ">=1.0.0" +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} +typing-extensions = ">=4.1.0" [package.extras] dmypy = ["psutil (>=4.0)"] -python2 = ["typed-ast (>=1.4.0,<1.5.0)"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] [[package]] name = "mypy-extensions" -version = "0.4.3" -description = "Experimental type system extensions for programs checked with the mypy typechecker." +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." optional = false -python-versions = "*" +python-versions = ">=3.5" files = [ - {file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"}, - {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] [[package]] @@ -711,17 +717,6 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-g testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] -[[package]] -name = "toml" -version = "0.10.2" -description = "Python Library for Tom's Obvious, Minimal Language" -optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -files = [ - {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, - {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, -] - [[package]] name = "tomli" version = "1.2.1" @@ -839,4 +834,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "b8be95542e16b1ade446b62a1d946bd94ef57c6b97b8db89b7972c27313a663f" +content-hash = "201df220273ee291d683fb7bb51443b7f05afbca578a26afe937fde7ac3ab10f" diff --git a/pyproject.toml b/pyproject.toml index 8ae2cf5..98e0390 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,10 +18,10 @@ mmh3 = "^4.0.0" ipython = "^7.13.0" flake8 = "^3.9.2" isort = "^5.9.3" -mypy = "^0.910" +mypy = "^1.9.0" pytest = "^7.4.0" pytest-asyncio = "^0.15.1" -black = "^24.0" +black = "^23.12.1" requests = "^2.31.0" mmh3 = "^4.0.0" diff --git a/rstream/client.py b/rstream/client.py index 82b520d..4312f0a 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -237,7 +237,6 @@ async def start(self) -> None: self.add_handler(schema.Close, self._on_close) async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None: - if subscriber_name not in self._frames: self.start_task( f"run_delivery_handlers_{subscriber_name}", @@ -245,7 +244,6 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) - ) async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]): - while self.is_connection_alive(): frame_entry = await self._frames[subscriber_name].get() try: @@ -336,7 +334,6 @@ async def close(self) -> None: connection_is_broken = True if self._conn is not None and self.is_connection_alive(): - if self.is_started: try: await asyncio.wait_for( @@ -481,7 +478,6 @@ async def query_leader_and_replicas( self, stream: str, ) -> tuple[schema.Broker, list[schema.Broker]]: - while True: metadata_resp = await self.sync_request( schema.Metadata( @@ -617,7 +613,6 @@ async def partitions(self, super_stream: str) -> list[str]: return resp.streams async def consumer_update(self, correlation_id: int, offset_specification: OffsetSpecification) -> None: - await self.send_frame( schema.ConsumerUpdateServerResponse( correlation_id=correlation_id, @@ -641,7 +636,6 @@ async def route(self, routing_key: str, super_stream: str) -> list[str]: async def exchange_command_version( self, command_info: schema.FrameHandlerInfo ) -> schema.FrameHandlerInfo: - command_versions_input = [] command_versions_input.append(command_info) resp = await self.sync_request( diff --git a/rstream/compression.py b/rstream/compression.py index 439ab07..c11d610 100644 --- a/rstream/compression.py +++ b/rstream/compression.py @@ -100,7 +100,6 @@ class GzipCompressionCodec(ICompressionCodec): buffer: bytes = bytes() def compress(self, messages: list[MessageT]): - uncompressed_data = bytes() for item in messages: msg = RawMessage(item) if isinstance(item, bytes) else item @@ -114,7 +113,6 @@ def compress(self, messages: list[MessageT]): self.compressed_data_size = len(self.buffer) def uncompress(self, compressed_data: bytes, uncompressed_data_size: int) -> bytes: - uncompressed_data = gzip.decompress(compressed_data) if len(uncompressed_data) != uncompressed_data_size: diff --git a/rstream/consumer.py b/rstream/consumer.py index 3941a97..e0ceec2 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -184,7 +184,6 @@ async def _create_subscriber( offset_type: OffsetType, offset: Optional[int], ) -> _Subscriber: - logger.debug("_create_subscriber(): Create subscriber") client = await self._get_or_create_client(stream) @@ -219,7 +218,6 @@ async def subscribe( consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None, filter_input: Optional[FilterConfiguration] = None, ) -> str: - logger.debug("Consumer subscribe()") if offset_specification is None: offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None) @@ -368,7 +366,6 @@ def _filter_messages( async def _on_deliver( self, frame: schema.Deliver, subscriber: _Subscriber, filter_value: Optional[FilterConfiguration] ) -> None: - if frame.subscription_id != subscriber.subscription_id: return @@ -382,7 +379,6 @@ async def _on_deliver( await maybe_coro async def _on_metadata_update(self, frame: schema.MetadataUpdate) -> None: - logger.debug("_on_metadata_update: On metadata update event triggered on producer") if frame.metadata_info.stream not in self._clients: return @@ -401,7 +397,6 @@ async def _on_consumer_update_query_response( reference: str, consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None, ) -> None: - # event the consumer is not active, we need to send a ConsumerUpdateResponse # by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default if consumer_update_listener is None: @@ -465,7 +460,6 @@ def get_stream(self, subscriber_name) -> str: return self._subscribers[subscriber_name].stream async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> None: - logging.debug("reconnect_stream") curr_subscriber = None curr_subscriber_id = None @@ -505,7 +499,6 @@ async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> N ) async def _check_if_filtering_is_supported(self) -> None: - command_version_input = schema.FrameHandlerInfo(Key.Publish.value, min_version=1, max_version=2) server_command_version: schema.FrameHandlerInfo = await ( await self.default_client @@ -526,13 +519,11 @@ async def _create_locator_connection(self) -> Client: ) async def _close_locator_connection(self): - if await (await self.default_client).get_stream_count() == 0: await (await self.default_client).close() self._default_client = None async def _maybe_clean_up_during_lost_connection(self, stream: str): - curr_subscriber = None for subscriber_id in self._subscribers: diff --git a/rstream/producer.py b/rstream/producer.py index 3554f81..a13c529 100644 --- a/rstream/producer.py +++ b/rstream/producer.py @@ -153,7 +153,6 @@ async def start(self) -> None: await self._check_if_filtering_is_supported() async def close(self) -> None: - logger.debug("Closing producer and cleaning up") # check if we are in a server disconnection situation: # in this case we need avoid other send @@ -229,7 +228,6 @@ async def _get_or_create_publisher( assert publisher.reference == publisher_name return publisher try: - logger.debug("_get_or_create_publisher(): Getting/Creating new publisher") client = await self._get_or_create_client(stream) @@ -293,7 +291,6 @@ async def send_batch( publisher_name: Optional[str] = None, on_publish_confirm: Optional[CB[ConfirmationStatus]] = None, ) -> list[int]: - logger.debug("Sending synchronously with _send_batch()") if len(batch) == 0: @@ -316,7 +313,6 @@ async def _send_batch( sync: bool = True, timeout: Optional[int] = None, ) -> list[int]: - logger.debug("Internal _send_batch()") messages = [] publishing_ids = set() @@ -326,7 +322,6 @@ async def _send_batch( publisher = await self._get_or_create_publisher(stream, publisher_name=publisher_name) for item in batch: - msg = RawMessage(item) if isinstance(item, bytes) else item if msg.publishing_id is None: @@ -353,7 +348,6 @@ async def _send_batch( ) if len(messages) > 0: - if self._filter_value_extractor is None: logger.debug("_send_batch: Calling send_publish_frame version 1") await publisher.client.send_publish_frame( @@ -398,7 +392,6 @@ async def _send_batch_async( stream: str, batch: list[_MessageNotification], ) -> list[int]: - logger.debug("Internal _send_batch_async()") if len(batch) == 0: @@ -412,7 +405,6 @@ async def _send_batch_async( publishing_ids_callback: dict[CB[ConfirmationStatus], set[int]] = defaultdict(set) for item in batch: - async with self._lock: try: logger.debug("_send_batch_async: Getting or Creating publisher") @@ -517,7 +509,6 @@ async def _send_batch_async( publishing_ids.update([m.publishing_id for m in messages]) for callback in publishing_ids_callback: - if callback not in self._waiting_for_confirm[publisher.reference]: self._waiting_for_confirm[publisher.reference][callback] = set() @@ -532,7 +523,6 @@ async def send_wait( publisher_name: Optional[str] = None, timeout: Optional[int] = 5, ) -> int: - logger.debug("Sending synchronously with send_wait") publishing_ids = await self._send_batch( stream, @@ -544,7 +534,6 @@ async def send_wait( return publishing_ids[0] def _timer_completed(self, context): - logger.debug("Background ingestion task completed") if not context.cancelled(): if context.exception(): @@ -559,7 +548,6 @@ async def send( publisher_name: Optional[str] = None, on_publish_confirm: Optional[CB[ConfirmationStatus]] = None, ): - logger.debug("Sending asynchronously with send()") # start the background thread to send buffered messages if self.task is None: @@ -588,7 +576,6 @@ async def send_sub_entry( publisher_name: Optional[str] = None, on_publish_confirm: Optional[CB[ConfirmationStatus]] = None, ): - logger.debug("Sending asynchronously with send_sub_entry()") if len(sub_entry_messages) == 0: @@ -630,7 +617,6 @@ async def _timer(self): logger.error("exception in _timer: " + str(ex)) async def _publish_buffered_messages(self, stream: str) -> None: - logger.debug("publishing message with _publish_buffered_messages") async with self._buffered_messages_lock: if len(self._buffered_messages[stream]): @@ -638,7 +624,6 @@ async def _publish_buffered_messages(self, stream: str) -> None: self._buffered_messages[stream].clear() async def _on_publish_confirm(self, frame: schema.PublishConfirm, publisher: _Publisher) -> None: - logger.debug("_on_publish_confirm callback: waiting for confirmations") if frame.publisher_id != publisher.id: return @@ -663,7 +648,6 @@ async def _on_publish_confirm(self, frame: schema.PublishConfirm, publisher: _Pu confirmation.set_result(None) async def _on_publish_error(self, frame: schema.PublishError, publisher: _Publisher) -> None: - if frame.publisher_id != publisher.id: return @@ -688,7 +672,6 @@ async def _on_publish_error(self, frame: schema.PublishError, publisher: _Publis confirmation.set_exception(exc) async def _on_metadata_update(self, frame: schema.MetadataUpdate) -> None: - logger.debug("_on_metadata_update: On metadata update event triggered on producer") async with self._lock: await self._maybe_clean_up_during_lost_connection(frame.metadata_info.stream) @@ -699,7 +682,6 @@ async def create_stream( arguments: Optional[dict[str, Any]] = None, exists_ok: bool = False, ) -> None: - async with self._lock: try: await (await self.default_client).create_stream(stream, arguments) @@ -728,7 +710,6 @@ async def delete_stream(self, stream: str, missing_ok: bool = False) -> None: await self._close_locator_connection() async def stream_exists(self, stream: str, on_close_event: bool = False) -> bool: - async with self._lock: if on_close_event: self._default_client = None @@ -737,7 +718,6 @@ async def stream_exists(self, stream: str, on_close_event: bool = False) -> bool return stream_exist async def _check_if_filtering_is_supported(self) -> None: - logger.debug("_check_if_filtering_is_supported") command_version_input = schema.FrameHandlerInfo(Key.Publish.value, min_version=1, max_version=2) server_command_version: schema.FrameHandlerInfo = await ( @@ -762,13 +742,11 @@ async def _create_locator_connection(self) -> Client: ) async def _close_locator_connection(self): - if await (await self.default_client).get_stream_count() == 0: await (await self.default_client).close() self._default_client = None async def _maybe_clean_up_during_lost_connection(self, stream: str): - logger.debug( "_maybe_clean_up_during_lost_connection: Cleaning after disconnection or metaata update events" ) @@ -776,7 +754,6 @@ async def _maybe_clean_up_during_lost_connection(self, stream: str): await asyncio.sleep(randrange(3)) if stream in self._publishers: - # try to delete the publisher if deadling try: await asyncio.wait_for( diff --git a/rstream/superstream.py b/rstream/superstream.py index d0e04a9..f37782f 100644 --- a/rstream/superstream.py +++ b/rstream/superstream.py @@ -77,7 +77,6 @@ def __init__(self, routingKeyExtractor: CB[Any]): self.routingKeyExtractor: CB[Any] = routingKeyExtractor async def route(self, message: MessageT, metadata: Metadata) -> list[str]: - logger.debug("route() Compute routing") streams = [] key = await self.routingKeyExtractor(message) diff --git a/rstream/superstream_consumer.py b/rstream/superstream_consumer.py index 5d6086f..865de6e 100644 --- a/rstream/superstream_consumer.py +++ b/rstream/superstream_consumer.py @@ -132,7 +132,6 @@ async def _get_or_create_client(self, stream: str) -> Client: return self._clients[stream] async def get_consumer(self, partition: str): - return self._consumers[partition] async def subscribe( @@ -147,7 +146,6 @@ async def subscribe( consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None, filter_input: Optional[FilterConfiguration] = None, ): - logger.debug("Superstream subscribe()") if offset_specification is None: offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None) @@ -206,7 +204,6 @@ async def _create_consumer(self) -> Consumer: return consumer async def unsubscribe(self) -> None: - logger.debug("unsubscribe(): unsubscribe superstream consumer unsubscribe all consumers") partitions = await self._super_stream_metadata.partitions() for partition in partitions: diff --git a/rstream/superstream_producer.py b/rstream/superstream_producer.py index 88756c5..ac87eb6 100644 --- a/rstream/superstream_producer.py +++ b/rstream/superstream_producer.py @@ -120,7 +120,6 @@ async def send( message: MessageT, on_publish_confirm: Optional[CB[ConfirmationStatus]] = None, ) -> None: - logger.debug("Send() asynchronously with superstream") streams = await self._routing_strategy.route(message, self.super_stream_metadata) self._producer = await self._get_producer() diff --git a/tests/conftest.py b/tests/conftest.py index ed3c70e..161d3d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -275,7 +275,6 @@ async def super_stream_consumer(pytestconfig, ssl_context): @pytest.fixture() async def super_stream_consumer_for_sac1(pytestconfig, ssl_context): - consumer = SuperStreamConsumer( host=pytestconfig.getoption("rmq_host"), port=pytestconfig.getoption("rmq_port"), @@ -314,7 +313,6 @@ async def super_stream_consumer_for_sac2(pytestconfig, ssl_context): @pytest.fixture() async def super_stream_consumer_for_sac3(pytestconfig, ssl_context): - consumer = SuperStreamConsumer( host=pytestconfig.getoption("rmq_host"), port=pytestconfig.getoption("rmq_port"), @@ -334,7 +332,6 @@ async def super_stream_consumer_for_sac3(pytestconfig, ssl_context): @pytest.fixture() async def super_stream_consumer_for_sac4(pytestconfig, ssl_context): - consumer = SuperStreamConsumer( host=pytestconfig.getoption("rmq_host"), port=pytestconfig.getoption("rmq_port"), diff --git a/tests/http_requests.py b/tests/http_requests.py index b778168..fc41db7 100644 --- a/tests/http_requests.py +++ b/tests/http_requests.py @@ -17,7 +17,6 @@ def delete_exchange(exchange_name: str) -> int: def create_binding(exchange_name: str, routing_key: str, stream_name: str): - data = { "routing_key": routing_key, } diff --git a/tests/test_client.py b/tests/test_client.py index d08799d..dca8604 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -125,7 +125,6 @@ async def test_routes(client: Client, stream: str) -> None: async def exchange_command_versions(client: Client) -> None: - expected_min_version = 1 expected_max_version = 1 command_version_input = schema.FrameHandlerInfo( diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 70b43a3..85dee6c 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -534,7 +534,6 @@ async def on_message_first(msg: AMQPMessage, message_context: MessageContext): async def test_consumer_connection_broke(stream: str) -> None: - connection_broke = False stream_disconnected = None consumer_broke: Consumer @@ -574,7 +573,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): async def test_super_stream_consumer_connection_broke(super_stream: str) -> None: - connection_broke = False streams_disconnected: set[str] = set() consumer_broke: Consumer @@ -624,7 +622,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): # Send a few messages to a superstream, consume, simulate a disconnection and check for reconnection # from offset 0 async def test_super_stream_consumer_connection_broke_with_reconnect(super_stream: str) -> None: - connection_broke = False streams_disconnected: set[str] = set() consumer_broke: Consumer @@ -703,7 +700,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): async def test_consume_filtering(stream: str, consumer: Consumer, producer_with_filtering: Producer) -> None: - filters = ["1"] captured: list[bytes] = [] @@ -723,7 +719,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): ) for j in range(10): - messages = [] for i in range(50): application_properties = { @@ -744,7 +739,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): async def test_consume_filtering_match_unfiltered( stream: str, consumer: Consumer, producer: Producer ) -> None: - filters = ["1"] captured: list[bytes] = [] @@ -764,7 +758,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): ) for j in range(10): - messages = [] for i in range(50): application_properties = { @@ -783,7 +776,6 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext): async def test_consumer_metadata_update(consumer: Consumer) -> None: - consumer_closed = False stream_disconnected = None stream = "test-stream-metadata-update" diff --git a/tests/test_producer.py b/tests/test_producer.py index cc62bc9..d7ba1b1 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -282,7 +282,6 @@ async def test_concurrent_publish_async(stream: str, producer: Producer, consume async def test_send_async_confirmation(stream: str, producer: Producer) -> None: - confirmed_messages: list[int] = [] errored_messages: list[int] = [] @@ -305,7 +304,6 @@ async def publish_with_ids(*ids): # Checks if to different sends can be registered different callbacks async def test_send_async_confirmation_on_different_callbacks(stream: str, producer: Producer) -> None: - confirmed_messages: list[int] = [] confirmed_messages2: list[int] = [] errored_messages: list[int] = [] @@ -338,7 +336,6 @@ async def publish_with_ids(*ids): async def test_send_entry_subbatch_async_confirmation(stream: str, producer: Producer) -> None: - confirmed_messages: list[int] = [] errored_messages: list[int] = [] @@ -437,7 +434,6 @@ async def test_publishing_sequence_superstream_binary( async def publish_with_ids(*ids): for _ in ids: - await super_stream_producer.send(b"one") await publish_with_ids(1, 2, 3) @@ -448,7 +444,6 @@ async def publish_with_ids(*ids): async def test_publishing_sequence_superstream_with_callback( super_stream: str, super_stream_producer: SuperStreamProducer ) -> None: - confirmed_messages: list[int] = [] errored_messages: list[int] = [] @@ -472,7 +467,6 @@ async def publish_with_ids(*ids): async def test_producer_connection_broke(stream: str, consumer: Consumer) -> None: - producer_broke: Producer producer_broke = Producer( @@ -505,7 +499,6 @@ async def test_producer_connection_broke(stream: str, consumer: Consumer) -> Non async def test_super_stream_producer_connection_broke(super_stream: str, consumer: Consumer) -> None: - producer_broke: Producer super_stream_producer_broke = SuperStreamProducer( diff --git a/tests/util.py b/tests/util.py index ab41ed5..7eedf0c 100644 --- a/tests/util.py +++ b/tests/util.py @@ -39,24 +39,20 @@ async def _wait(): async def consumer_update_handler_next(is_active: bool, event_context: EventContext) -> OffsetSpecification: - return OffsetSpecification(OffsetType.NEXT, 0) async def consumer_update_handler_first(is_active: bool, event_context: EventContext) -> OffsetSpecification: - return OffsetSpecification(OffsetType.FIRST, 0) async def consumer_update_handler_offset(is_active: bool, event_context: EventContext) -> OffsetSpecification: - return OffsetSpecification(OffsetType.OFFSET, 10) async def on_publish_confirm_client_callback( confirmation: ConfirmationStatus, confirmed_messages: list[int], errored_messages: list[int] ) -> None: - if confirmation.is_confirmed is True: confirmed_messages.append(confirmation.message_id) else: @@ -66,7 +62,6 @@ async def on_publish_confirm_client_callback( async def on_publish_confirm_client_callback2( confirmation: ConfirmationStatus, confirmed_messages: list[int], errored_messages: list[int] ) -> None: - if confirmation.is_confirmed is True: confirmed_messages.append(confirmation.message_id) else: @@ -92,7 +87,6 @@ async def routing_extractor_key(message: AMQPMessage) -> str: async def on_message( msg: AMQPMessage, message_context: MessageContext, streams: list[str], offsets: list[int] ): - stream = message_context.consumer.get_stream(message_context.subscriber_name) streams.append(stream) offset = message_context.offset @@ -100,7 +94,6 @@ async def on_message( async def on_message_sac(msg: AMQPMessage, message_context: MessageContext, streams: list[str]): - stream = message_context.consumer.get_stream(message_context.subscriber_name) streams.append(stream) @@ -110,7 +103,6 @@ async def run_consumer( streams: list[str], consumer_update_listener: Optional[Callable[[bool, EventContext], Awaitable[Any]]] = None, ): - properties: dict[str, str] = defaultdict(str) properties["single-active-consumer"] = "true" properties["name"] = "consumer-group-1" @@ -127,7 +119,6 @@ async def run_consumer( async def task_to_delete_connection(connection_name: str) -> None: - # delay a few seconds before deleting the connection await asyncio.sleep(5) @@ -142,7 +133,6 @@ async def task_to_delete_connection(connection_name: str) -> None: async def task_to_delete_stream_producer(producer: Producer, stream: str) -> None: - # delay a few seconds before deleting the connection await asyncio.sleep(4) @@ -150,7 +140,6 @@ async def task_to_delete_stream_producer(producer: Producer, stream: str) -> Non async def task_to_delete_stream_consumer(consumer: Consumer, stream: str) -> None: - # delay a few seconds before deleting the connection await asyncio.sleep(4)