diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 994d9e865f..bde32cb981 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -12,6 +12,8 @@ use yii\di\Instance; use yii\queue\cli\Queue as CliQueue; use yii\redis\Connection; +use yii\redis\Mutex; +use yii\mutex\Mutex as BaseMutex; /** * Redis Queue. @@ -20,14 +22,34 @@ */ class Queue extends CliQueue { + /** + * @var int + */ + const MUTEX_TIMEOUT = 0; + /** * @var Connection|array|string */ public $redis = 'redis'; + + /** + * @var Mutex|array|string + */ + public $mutex = [ + 'class' => Mutex::class, + 'redis' => 'redis', + ]; + + /** + * @var integer Number of microseconds between attempts to acquire a lock. + */ + public $acquireTimeout = 10000; + /** * @var string */ public $channel = 'queue'; + /** * @var string command class name */ @@ -41,6 +63,7 @@ public function init() { parent::init(); $this->redis = Instance::ensure($this->redis, Connection::class); + $this->mutex = Instance::ensure($this->mutex, BaseMutex::class); } /** @@ -56,13 +79,28 @@ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { - if (($payload = $this->reserve($timeout)) !== null) { - list($id, $message, $ttr, $attempt) = $payload; - if ($this->handleMessage($id, $message, $ttr, $attempt)) { - $this->delete($id); + if ($this->acquire()) { + try { + $payload = $this->reserve($timeout); + } finally { + $this->release(); + } + + if ($payload !== null) { + list($id, $message, $ttr, $attempt) = $payload; + if ($this->handleMessage($id, $message, $ttr, $attempt)) { + $this->delete($id); + } else { + // job is failed but we want to return it back into + // the queue + $this->redis->zadd("$this->channel.reserved", time(), $id); + } + + } elseif (!$repeat) { + break; } - } elseif (!$repeat) { - break; + } else { + usleep($this->acquireTimeout); } } }); @@ -95,10 +133,15 @@ public function status($id) */ public function clear() { - while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) { - usleep(10000); + while (!$this->acquire()) { + usleep($this->acquireTimeout); + } + + try { + $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); + } finally { + $this->release(); } - $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); } /** @@ -110,19 +153,25 @@ public function clear() */ public function remove($id) { - while (!$this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { - usleep(10000); + while (!$this->acquire()) { + usleep($this->acquireTimeout); } - if ($this->redis->hdel("$this->channel.messages", $id)) { - $this->redis->zrem("$this->channel.delayed", $id); - $this->redis->zrem("$this->channel.reserved", $id); - $this->redis->lrem("$this->channel.waiting", 0, $id); - $this->redis->hdel("$this->channel.attempts", $id); - return true; - } + try { + if ($this->redis->hdel("$this->channel.messages", $id)) { + $this->redis->zrem("$this->channel.delayed", $id); + $this->redis->zrem("$this->channel.reserved", $id); + $this->redis->lrem("$this->channel.waiting", 0, $id); + $this->redis->hdel("$this->channel.attempts", $id); + + return true; + } + + return false; - return false; + } finally { + $this->release(); + } } /** @@ -131,11 +180,9 @@ public function remove($id) */ protected function reserve($timeout) { - // Moves delayed and reserved jobs into waiting list with lock for one second - if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { - $this->moveExpired("$this->channel.delayed"); - $this->moveExpired("$this->channel.reserved"); - } + // Moves delayed and reserved jobs into waiting list + $this->moveExpired("$this->channel.delayed"); + $this->moveExpired("$this->channel.reserved"); // Find a new waiting message $id = null; @@ -150,7 +197,9 @@ protected function reserve($timeout) $payload = $this->redis->hget("$this->channel.messages", $id); list($ttr, $message) = explode(';', $payload, 2); - $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id); + if (!empty($ttr)) { + $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id); + } $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1); return [$id, $message, $ttr, $attempt]; @@ -201,4 +250,25 @@ protected function pushMessage($message, $ttr, $delay, $priority) return $id; } + + /** + * Acquire the lock. + * + * @return boolean + */ + protected function acquire() + { + return $this->mutex->acquire(__CLASS__ . $this->channel, self::MUTEX_TIMEOUT); + } + + /** + * Release the lock. + * + * @return boolean + */ + protected function release() + { + return $this->mutex->release(__CLASS__ . $this->channel); + } + }