Skip to content

Commit

Permalink
types for read()
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend committed Dec 19, 2024
1 parent 4078307 commit 9f9a5e9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions tembo-pgmq-python/tembo_pgmq_python/async_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optiona
async def _read_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down Expand Up @@ -246,7 +246,7 @@ async def read_batch(
async def _read_batch_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading batch of messages from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down
6 changes: 3 additions & 3 deletions tembo-pgmq-python/tembo_pgmq_python/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None
def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
self.logger.debug(f"read called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, 1], conn=conn)
messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0] if messages else None
Expand All @@ -142,7 +142,7 @@ def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Mess
def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
self.logger.debug(f"read_batch called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, batch_size], conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]

Expand All @@ -158,7 +158,7 @@ def read_with_poll(
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
self.logger.debug(f"read_with_poll called with conn: {conn}")
query = "select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);"
query = "select * from pgmq.read_with_poll(%s::text, %s::integer, %s::integer, %s::integer, %s::integer);"
params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms]
rows = self._execute_query_with_result(query, params, conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
Expand Down

0 comments on commit 9f9a5e9

Please sign in to comment.