Skip to content

Commit

Permalink
队列实现优化
Browse files Browse the repository at this point in the history
  • Loading branch information
kiss291323003 committed Feb 22, 2023
1 parent 865295f commit f354996
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 39 deletions.
64 changes: 49 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
composer require easyswoole/queue
```

## 使用
默认自带的队列驱动为Redis队列。
### 队列驱动

任何队列驱动都必须实现```EasySwoole\Queue\QueueDriverInterface```这个接口定义。且实现的对象一定是必须可被克隆(可以看Queue中的Producer方法即知道为何)。
队列在被加载到对应topic的Producer和Consumer时,都会被分别执行一次init()方法。

### 创建队列
```php

Expand All @@ -29,15 +32,17 @@ $queue = new Queue($driver);
```
$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$queue->producer()->push($job);
$queue->producer("topic")->push($job);
```
### 普通消费
```
$job = $queue->consumer()->pop();
```php

$job = $queue->consumer("topic")->pop();
//或者是自定义进程中
$queue->consumer()->listen(function (Job $job){
$queue->consumer("topicName")->listen(function (Job $job){
var_dump($job);
});

```

## CLI 单独使用
Expand Down Expand Up @@ -67,13 +72,42 @@ $sc->add(function (){
while (1){
Coroutine::sleep(3);
$job = new Job();
$job->setJobData("job create at ".time());
$queue->producer()->push($job);
$job->setJobData("job test create at ".time());
try {
$queue->producer("test")->push($job);
}catch (\Throwable $throwable){

}
}
});

Coroutine::create(function ()use($queue){
while (1){
Coroutine::sleep(5);
$job = new Job();
$job->setJobData("job another create at ".time());
try {
$queue->producer("another")->push($job);
}catch (\Throwable $throwable){

}
}
});

$queue->consumer()->listen(function (Job $job){
var_dump($job->getJobData());
Coroutine::create(function ()use($queue){
$queue->consumer("test")->listen(function (Job $job){
var_dump($job->getJobData() ." hande in test");
},[],function (){

});
});

Coroutine::create(function ()use($queue){
$queue->consumer("another")->listen(function (Job $job){
var_dump($job->getJobData() ." hande in another");
},[],function (){

});
});

});
Expand All @@ -86,17 +120,17 @@ $sc->start();
$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$job->setDelayTime(5);//设置延后时间
$queue->producer()->push($job);
$queue->producer("topic")->push($job);
```
## 可信任务
```
$job = new Job();
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
$job->setRetryTimes(3);//任务如果没有确认,则会执行三次
$job->setWaitConfirmTime(5);//如果5秒内没确认任务,会重新回到队列。默认为3秒
$queue->producer()->push($job);//投递任务
$queue->producer("topic")->push($job);//投递任务
//确认一个任务
$queue->consumer()->confirm($job);
$queue->consumer("topic")->confirm($job);
```

## 消费者控制
Expand All @@ -105,7 +139,7 @@ $queue->consumer()->confirm($job);

```php
/** @var \EasySwoole\Queue\Queue $queue */
$queue->consumer()->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
$queue->consumer("topic")->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {
// todo
})->listen(function (\EasySwoole\Queue\Job $job){ });
```
Expand All @@ -114,5 +148,5 @@ $queue->consumer()->setOnBreak(function(\EasySwoole\Queue\Consumer $consumer) {

```php
/** @var \EasySwoole\Queue\Queue $queue */
$queue->consumer()->setBreakTime(0.1);
$queue->consumer("topic")->setBreakTime(0.1);
```
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"require": {
"ext-swoole": ">=4.4.0",
"easyswoole/redis": "^2.0",
"easyswoole/pool": "^1.2"
"easyswoole/pool": "^2.0"
},
"require-dev": {
"easyswoole/swoole-ide-helper": "^1.0"
Expand Down
32 changes: 25 additions & 7 deletions src/Driver/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@
class RedisQueue implements QueueDriverInterface
{
protected $pool;
protected $queueName;
protected $queueName = 'es_q';
protected $lastCheckDelay = null;

public function __construct(Config $config,string $queueName = 'es_q')
protected $config;

protected $hasInit = false;

public function __construct(Config $config)
{
$this->config = new PoolConfig();
$this->config->setExtraConf($config);
}

public function init(string $topicName, ?string $nodeId): bool
{
$poolConfig = new PoolConfig();
$poolConfig->setExtraConf($config);
$this->pool = new RedisPool($poolConfig);
$this->queueName = $queueName;
if(!$this->hasInit){
$this->hasInit = true;
}
$this->queueName = $topicName;
$this->pool = new RedisPool($this->config);
return true;
}


public function getPoolConfig(): PoolConfig
{
return $this->pool->getConfig();
return $this->config;
}

public function push(Job $job,float $timeout = 3.0): bool
Expand Down Expand Up @@ -155,4 +168,9 @@ public function flush():bool
});
return true;
}

public function __clone()
{

}
}
37 changes: 21 additions & 16 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
namespace EasySwoole\Queue;


use EasySwoole\Queue\Exception\Exception;
use EasySwoole\Utility\Random;

class Queue
{
private $driver;
private $nodeId;

private $consumer;
private $producer;
private $consumer = [];
private $producer = [];

function __construct(QueueDriverInterface $driver)
{
Expand All @@ -25,26 +26,30 @@ function queueDriver():QueueDriverInterface
return $this->driver;
}

function consumer(bool $renew = false):Consumer
function consumer(string $topic,bool $renew = false):Consumer
{
if(!$renew){
$this->consumer = new Consumer($this->driver);
if((!$renew) || !isset($this->consumer[$topic])){
$driver = clone $this->driver;
if(!$driver->init($topic,$this->nodeId)){
throw new Exception("init queue topic:{$topic} driver fail");
}
$temp = new Consumer($driver);
$this->consumer[$topic] = $temp;
}
if($this->consumer == null){
$this->consumer = new Consumer($this->driver);
}
return $this->consumer;
return $this->consumer[$topic];
}

function producer(bool $renew = false):Producer
function producer(string $topic,bool $renew = false):Producer
{
if(!$renew){
$this->producer = new Producer($this->driver, $this->nodeId);
}
if($this->producer == null){
$this->producer = new Producer($this->driver, $this->nodeId);
if((!$renew) || !isset($this->producer[$topic])){
$driver = clone $this->driver;
if(!$driver->init($topic,$this->nodeId)){
throw new Exception("init queue topic:{$topic} driver fail");
}
$temp = new Producer($driver,$this->nodeId);
$this->producer[$topic] = $temp;
}
return $this->producer;
return $this->producer[$topic];
}

function info():?array
Expand Down
3 changes: 3 additions & 0 deletions src/QueueDriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

interface QueueDriverInterface
{
public function init(string $topicName,?string $nodeId):bool;
public function push(Job $job,float $timeout = 3.0): bool;

public function pop(float $timeout = 3.0, ?array $params = null): ?Job;
Expand All @@ -15,4 +16,6 @@ public function info(): ?array;
public function confirm(Job $job,float $timeout = 3.0): bool;

public function flush():bool;

public function __clone();
}

0 comments on commit f354996

Please sign in to comment.