Skip to content

Commit

Permalink
Use FOR UPDATE SKIP LOCKED when supported by the database.
Browse files Browse the repository at this point in the history
Replaces #817
  • Loading branch information
coleifer committed Oct 8, 2024
1 parent 682d1b1 commit dd513aa
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions huey/contrib/sql_huey.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ def __init__(self, name='huey', database=None, **kwargs):
self.KV, self.Schedule, self.Task = self.create_models()
self.create_tables()

# Check for FOR UPDATE SKIP LOCKED support.
if isinstance(self.database, PostgresqlDatabase):
self.for_update = 'FOR UPDATE SKIP LOCKED'
elif isinstance(self.database, MySQLDatabase):
self.for_update = 'FOR UPDATE SKIP LOCKED' # Assume support.
# Try to determine if we're using MariaDB or MySQL.
version, = self.database.execute_sql('select version()').fetchone()
if 'mariadb' in str(version).lower():
# MariaDB added support in 10.6.0.
if self.database.server_version < (10, 6):
self.for_update = 'FOR UPDATE'
elif self.database.server_version < (8, 0, 1):
# MySQL added support in 8.0.1.
self.for_update = 'FOR UPDATE'
else:
self.for_update = None

def create_models(self):
class Base(Model):
class Meta:
Expand Down Expand Up @@ -96,8 +113,8 @@ def dequeue(self):
query = (self.tasks(self.Task.id, self.Task.data)
.order_by(self.Task.priority.desc(), self.Task.id)
.limit(1))
if self.database.for_update:
query = query.for_update()
if self.for_update:
query = query.for_update(self.for_update)

with self.database.atomic():
try:
Expand Down Expand Up @@ -131,8 +148,8 @@ def read_schedule(self, timestamp):
query = (self.schedule(self.Schedule.id, self.Schedule.data)
.where(self.Schedule.timestamp <= timestamp)
.tuples())
if self.database.for_update:
query = query.for_update()
if self.for_update:
query = query.for_update(self.for_update)

with self.database.atomic():
results = list(query)
Expand Down Expand Up @@ -185,8 +202,8 @@ def peek_data(self, key):
def pop_data(self, key):
self.check_conn()
query = self.kv().where(self.KV.key == key)
if self.database.for_update:
query = query.for_update()
if self.for_update:
query = query.for_update(self.for_update)

with self.database.atomic():
try:
Expand Down

0 comments on commit dd513aa

Please sign in to comment.