diff --git a/CHANGELOG.md b/CHANGELOG.md index 652cce681..01aec2929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ Yii2 Queue Extension Change Log 2.3.8 under development ----------------------- - +- Enh: Ensure Redis driver messages are consumed at least once (soul11201) - Bug #522: Fix SQS driver type error with custom value passed to `queue/listen` (flaviovs) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index deabbb870..1da823879 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -170,10 +170,10 @@ protected function moveExpired($from) { $now = time(); if ($expired = $this->redis->zrevrangebyscore($from, $now, '-inf')) { - $this->redis->zremrangebyscore($from, '-inf', $now); foreach ($expired as $id) { $this->redis->rpush("$this->channel.waiting", $id); } + $this->redis->zremrangebyscore($from, '-inf', $now); } } diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 8fbbf5a00..92200fd63 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -10,6 +10,7 @@ use tests\app\RetryJob; use tests\drivers\CliTestCase; use Yii; +use yii\di\Instance; use yii\queue\redis\Queue; /** @@ -137,4 +138,53 @@ protected function tearDown() $this->getQueue()->redis->flushdb(); parent::tearDown(); } + + /** + * Verify that Redis data persists when process crashes during moveExpired. + * + * Steps: + * 1. Push a delayed job into queue + * 2. Wait for the job to expire + * 3. Mock Redis to simulate crash during moveExpired + * 4. Successfully process job after recovery + */ + public function testConsumeDelayedMessageAtLeastOnce() + { + $job = $this->createSimpleJob(); + $this->getQueue()->delay(1)->push($job); + // Expect a single message to be received. + $messageCount = 0; + $this->getQueue()->messageHandler = function () use(&$messageCount) { + $messageCount++; + }; + + // Ensure the delayed message can be consumed when more time passed than the delay is. + sleep(2); + + // Based on the implemention, emulate a crash when redis "rpush" + // command should be executed. + $mockRedis = Instance::ensure([ + 'class' => RedisCrashMock::class, + 'hostname' => getenv('REDIS_HOST') ?: 'localhost', + 'database' => getenv('REDIS_DB') ?: 1, + 'crashOnCommand' => 'rpush' // Crash when trying to move job to waiting queue. + ], 'yii\redis\Connection'); + + $queue = $this->getQueue(); + $old = $queue->redis; + $queue->redis = $mockRedis; + + try { + $queue->run(false); + } catch (\Exception $e) { + // Ignore exceptions. + } finally { + $queue->redis = $old; + } + + // Ensure the red lock is invalid. The red lock is valid for 1s. + sleep(2); + $this->getQueue()->run(false); + $this->assertEquals(1, $messageCount); + } } diff --git a/tests/drivers/redis/RedisCrashMock.php b/tests/drivers/redis/RedisCrashMock.php new file mode 100644 index 000000000..e463e08a9 --- /dev/null +++ b/tests/drivers/redis/RedisCrashMock.php @@ -0,0 +1,17 @@ +crashOnCommand) { + throw new \RuntimeException('Simulated Redis crash'); + } + return parent::executeCommand($name, $params); + } +}