Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #173 #244

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 87 additions & 23 deletions src/drivers/redis/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use yii\di\Instance;
use yii\queue\cli\Queue as CliQueue;
use yii\redis\Connection;
use yii\redis\Mutex;

/**
* Redis Queue.
Expand All @@ -24,10 +25,25 @@ class Queue extends CliQueue
* @var Connection|array|string
*/
public $redis = 'redis';

/**
* @var Mutex|array|string
*/
public $mutex = [
'class' => Mutex::class,
'redis' => 'redis',
];

/**
* @var integer
*/
public $mutexTimeout = 3;

/**
* @var string
*/
public $channel = 'queue';

/**
* @var string command class name
*/
Expand All @@ -41,6 +57,7 @@ public function init()
{
parent::init();
$this->redis = Instance::ensure($this->redis, Connection::class);
$this->mutex = Instance::ensure($this->mutex, Mutex::class);
}

/**
Expand All @@ -56,13 +73,26 @@ public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if (($payload = $this->reserve($timeout)) !== null) {
list($id, $message, $ttr, $attempt) = $payload;
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
$this->delete($id);
if ($this->acquire()) {
try {
$payload = $this->reserve($timeout);
} finally {
$this->release();
}

if ($payload !== null) {
list($id, $message, $ttr, $attempt) = $payload;
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
$this->delete($id);
} else {
// job is failed but we want to return it back into
// the queue
$this->redis->zadd("$this->channel.reserved", time(), $id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That cold lead to fail-add indefinite cycle. Should be limited number of re-adding the job back.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you are right, but I rely on a class/global defined number of attempts. In this case if all attempts have been used up, job would be marked as a handled and this condition will never be invoked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha.

}

} elseif (!$repeat) {
break;
}
} elseif (!$repeat) {
break;
}
}
});
Expand Down Expand Up @@ -95,10 +125,15 @@ public function status($id)
*/
public function clear()
{
while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) {
while (!$this->acquire(0)) {
usleep(10000);
}
$this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*"));

try {
$this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*"));
} finally {
$this->release();
}
}

/**
Expand All @@ -110,19 +145,25 @@ public function clear()
*/
public function remove($id)
{
while (!$this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
while (!$this->acquire(0)) {
usleep(10000);
}
if ($this->redis->hdel("$this->channel.messages", $id)) {
$this->redis->zrem("$this->channel.delayed", $id);
$this->redis->zrem("$this->channel.reserved", $id);
$this->redis->lrem("$this->channel.waiting", 0, $id);
$this->redis->hdel("$this->channel.attempts", $id);

return true;
}
try {
if ($this->redis->hdel("$this->channel.messages", $id)) {
$this->redis->zrem("$this->channel.delayed", $id);
$this->redis->zrem("$this->channel.reserved", $id);
$this->redis->lrem("$this->channel.waiting", 0, $id);
$this->redis->hdel("$this->channel.attempts", $id);

return true;
}

return false;

return false;
} finally {
$this->release();
}
}

/**
Expand All @@ -131,11 +172,9 @@ public function remove($id)
*/
protected function reserve($timeout)
{
// Moves delayed and reserved jobs into waiting list with lock for one second
if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
$this->moveExpired("$this->channel.delayed");
$this->moveExpired("$this->channel.reserved");
}
// Moves delayed and reserved jobs into waiting list
$this->moveExpired("$this->channel.delayed");
$this->moveExpired("$this->channel.reserved");

// Find a new waiting message
$id = null;
Expand All @@ -150,7 +189,9 @@ protected function reserve($timeout)

$payload = $this->redis->hget("$this->channel.messages", $id);
list($ttr, $message) = explode(';', $payload, 2);
$this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
if (!empty($ttr)) {
$this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
}
$attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);

return [$id, $message, $ttr, $attempt];
Expand Down Expand Up @@ -201,4 +242,27 @@ protected function pushMessage($message, $ttr, $delay, $priority)

return $id;
}

/**
* Acquire the lock.
*
* @return boolean
*/
protected function acquire($timeout = null)
{
$timeout = $timeout !== null ? $timeout : $this->mutexTimeout;

return $this->mutex->acquire(__CLASS__ . $this->channel, $timeout);
}

/**
* Release the lock.
*
* @return boolean
*/
protected function release()
{
return $this->mutex->release(__CLASS__ . $this->channel);
}

}