-
Hello. First: THANK YOU and congratulation for your awesome package !!! It's really very smart ! I've written a small app to try to explain my issue (probably a usage I don't understand...). If I use faststream directlyI register a consumer dynamically (because the queue is created dynamically) like that: broker = RabbitBroker()
# ...
subscriber = broker.subscriber(queue) # queue is a RabbitQueue, with a dynamic name, computed at runtime.
subscriber(handle_method)
broker.setup_subscriber(subscriber)
await subscriber.start() handle_method is: class StatusData(BaseModel):
topic: str
value: dict
async def handle_method(status_data: StatusData):
logging.info(status_data) If I use fastream as a fastapi pluginAlmost same code: router = RabbitRouter()
# ...
subscriber = router.broker.subscriber(queue)
subscriber(handle_method)
router.broker.setup_subscriber(subscriber)
await subscriber.start() With the same handle_method. differences I can see:
Questions
Thanks for your help and explanationsand your great great work! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 3 replies
-
Yeah, I know about it - FastAPI subscriber is a little bit changes from the original one. I am working on consistency with these two, but it is still WIP |
Beta Was this translation helpful? Give feedback.
-
Hi. |
Beta Was this translation helpful? Give feedback.
-
Hello. About that discussion: is it covered by an issue (enh) ? Thanks. Nico |
Beta Was this translation helpful? Give feedback.
-
@gri38 we had just a #1640 Issue for it. |
Beta Was this translation helpful? Give feedback.
-
Just for me later or other if it can help. Here is my tests, and indeed I can write the same code to access data and Rabbit message: Faststream app# Read https://faststream.airt.ai/latest/getting-started/context/
Message = Annotated[RabbitMessage, Context("message")]
broker = RabbitBroker()
app = FastStream(broker)
queue = RabbitQueue("Test", auto_delete=True, durable=True)
@broker.subscriber(queue, title="TestFS", description="TestFS desc")
def handle_method2(data: StatusData, message: Message):
# here data has been validated by pydantic (StatusData is a BaseModel)
logger.info("Message received")
@app.after_startup
async def app_after_startup():
logger.info("+++++++++++++++")
await broker.declare_queue(queue)
async def main():
await app.run(log_level=logging.INFO)
if __name__ == "__main__":
asyncio.run(main()) faststream as fastapi plugin# Read https://faststream.airt.ai/latest/getting-started/context/
Message = Annotated[RabbitMessage, Context("message")]
rabbitmq_router = RabbitRouter()
@asynccontextmanager
async def fastapi_lifespan(_application: FastAPI):
logging.info("Please visit Swagger UI at http://localhost:8000/docs")
yield
logging.info("Goodbye")
@asynccontextmanager
async def lifespan(application: FastAPI):
async with (fastapi_lifespan(application), rabbitmq_router.lifespan_context(application), ):
yield
_queue = RabbitQueue("TestFA", auto_delete=True, durable=True)
@rabbitmq_router.after_startup
async def broker_after_startup(_app: FastAPI):
await rabbitmq_router.broker.declare_queue(_queue)
logging.info("Broker is ready.")
@rabbitmq_router.subscriber(_queue)
async def handle_message(data: StatusData, msg: Message):
# same a sin faststream app
logger.info("Message received")
app = FastAPI(lifespan=lifespan)
app.include_router(rabbitmq_router) |
Beta Was this translation helpful? Give feedback.
@gri38 we had just a #1640 Issue for it.
It was a service mainly change and we don't announce it wire enough.
Now FastAPI integration creates the same objects with regular FastStream broker/router, supports our Response classes and able to include FastStream Routers to it