Skip to content

Commit

Permalink
Merge pull request #15 from grzegorz-aniol/fix/custom-queue-name
Browse files Browse the repository at this point in the history
Fix for supporting custom queue table name
  • Loading branch information
polyrand authored Aug 8, 2024
2 parents 3253758 + 21980b2 commit 14c0dd1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
6 changes: 3 additions & 3 deletions litequeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def list_failed(self) -> Iterable[Message]:

cursor = self.conn.execute(
f"""
SELECT * FROM Queue
SELECT * FROM {self.table}
WHERE
status = {MessageStatus.FAILED.value}
""".strip()
Expand Down Expand Up @@ -546,11 +546,11 @@ def prune(self, include_failed: bool = True):
"""
if include_failed:
self.conn.execute(
f"DELETE FROM Queue WHERE status IN ({MessageStatus.DONE.value}, {MessageStatus.FAILED.value})"
f"DELETE FROM {self.table} WHERE status IN ({MessageStatus.DONE.value}, {MessageStatus.FAILED.value})"
)
else:
self.conn.execute(
f"DELETE FROM Queue WHERE status IN ({MessageStatus.DONE.value})"
f"DELETE FROM {self.table} WHERE status IN ({MessageStatus.DONE.value})"
)

def vacuum(self):
Expand Down
24 changes: 21 additions & 3 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@
# https://docs.pytest.org/en/7.1.x/how-to/fixtures.html#parametrizing-fixtures


@pytest.fixture(scope="function", params=[None, "CustomQueue"], ids=["table_name=<Default>", "table_name=CustomQueue"])
def queue_name(request) -> str:
return request.param


@pytest.fixture(scope="function", params=["_pop_transaction", "_pop_returning"])
def single_queue(request) -> LiteQueue:
_q = LiteQueue(":memory:")
def single_queue(request, queue_name) -> LiteQueue:
kwargs = {'filename_or_conn': ':memory:'}
if queue_name is not None:
kwargs['queue_name'] = queue_name
_q = LiteQueue(**kwargs)

if _q.get_sqlite_version() > 35:
_q.pop = getattr(_q, request.param)
Expand Down Expand Up @@ -120,7 +128,7 @@ def test_prune(queue_with_data):

assert (
q.conn.execute(
f"SELECT * FROM Queue WHERE status = {MessageStatus.DONE.value}"
f"SELECT * FROM {q.table} WHERE status = {MessageStatus.DONE.value}"
).fetchall()
== []
)
Expand Down Expand Up @@ -181,3 +189,13 @@ def test_retry_failed(single_queue):
assert q.get(task.message_id).status == MessageStatus.READY
assert q.get(task.message_id).done_time is None
assert q.qsize() == 1


def test_count_failed(single_queue):
q = single_queue

q.put("foot")
task = q.pop()
q.mark_failed(task.message_id)

assert len(list(q.list_failed())) == 1

0 comments on commit 14c0dd1

Please sign in to comment.