From 9f9a5e962c5a4ba23ef222de2f5fcd5852ed384e Mon Sep 17 00:00:00 2001 From: Adam Hendel Date: Thu, 19 Dec 2024 12:22:11 -0600 Subject: [PATCH] types for read() --- tembo-pgmq-python/tembo_pgmq_python/async_queue.py | 4 ++-- tembo-pgmq-python/tembo_pgmq_python/queue.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py index c538e564..2e45f078 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py @@ -213,7 +213,7 @@ async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optiona async def _read_internal(self, queue, vt, batch_size, conn): self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}") rows = await conn.fetch( - "SELECT * FROM pgmq.read($1, $2, $3);", + "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);", queue, vt or self.vt, batch_size, @@ -246,7 +246,7 @@ async def read_batch( async def _read_batch_internal(self, queue, vt, batch_size, conn): self.logger.debug(f"Reading batch of messages from queue '{queue}' with vt={vt}") rows = await conn.fetch( - "SELECT * FROM pgmq.read($1, $2, $3);", + "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);", queue, vt or self.vt, batch_size, diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index a4f770a1..1c81d891 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -133,7 +133,7 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]: """Read a message from a queue.""" self.logger.debug(f"read called with conn: {conn}") - query = "select * from pgmq.read(%s, %s, %s);" + query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);" rows = self._execute_query_with_result(query, [queue, vt or self.vt, 1], conn=conn) messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows] return messages[0] if messages else None @@ -142,7 +142,7 @@ def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Mess def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None) -> Optional[List[Message]]: """Read a batch of messages from a queue.""" self.logger.debug(f"read_batch called with conn: {conn}") - query = "select * from pgmq.read(%s, %s, %s);" + query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);" rows = self._execute_query_with_result(query, [queue, vt or self.vt, batch_size], conn=conn) return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows] @@ -158,7 +158,7 @@ def read_with_poll( ) -> Optional[List[Message]]: """Read messages from a queue with polling.""" self.logger.debug(f"read_with_poll called with conn: {conn}") - query = "select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);" + query = "select * from pgmq.read_with_poll(%s::text, %s::integer, %s::integer, %s::integer, %s::integer);" params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms] rows = self._execute_query_with_result(query, params, conn=conn) return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]