-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
microrain
authored and
microrain
committed
Feb 21, 2024
1 parent
37b4c05
commit 0425f65
Showing
4 changed files
with
121 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"label": "任务队列", | ||
"position": 3, | ||
"link": { | ||
"type": "generated-index" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# 概要说明 | ||
|
||
SagooIoT系统支持任务队列(Task Queue)处理。任务队列一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。 | ||
任务队列的输入是称为任务(Task)的工作单元。专用的工作进程不断监视任务队列以查找要执行的新工作。 | ||
|
||
## 实现说明 | ||
SagooIoT的任务队列基于Asynq实现,支持分布式任务,支持定时任务,支持后台任务,支持解耦任务,支持实时处理任务。 | ||
Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。 | ||
Asynq主要由以下几个组件组成: | ||
* 任务(Task):需要被异步执行的操作; | ||
* 处理器(Processor):负责执行任务的工作进程; | ||
* 队列(Queue):存放待执行任务的队列; | ||
* 调度器(Scheduler):根据规则将任务分配给不同的处理器进行执行。 | ||
|
||
关于Asynq的更多信息,请参考[Asynq](https://www.tizi365.com/topic/14001.html)。 | ||
|
||
## 功能特点 | ||
* 保证至少执行一次任务 | ||
* 任务写入Redis后可以持久化 | ||
* 任务失败之后,会自动重试 | ||
* worker崩溃自动恢复 | ||
* 可是实现任务的优先级 | ||
* 任务可以进行编排 | ||
* 任务可以设定执行时间或者最长可执行的时间a | ||
* 支持中间件 | ||
* 可以使用 unique-option 来避免任务重复执行,实现唯一性 | ||
* 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性 | ||
|
||
## 应用场景 | ||
|
||
* 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。 | ||
* 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式 | ||
* 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。 | ||
* 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。 | ||
* 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# 消息队列 | ||
|
||
在SagooIoT中实现消息队列的处理方式很简单,只需要实现一个消息队列处理器即可。 | ||
系统中定义了消息队列的实现接口,可以通过消息队列实现进程间通信。消息队列是消息的链表,每个消息队列都有一个唯一的标识符,用于标识消息队列。消息队列的消息是一个结构体,包含消息分组、唯一ID、消息数据部分。 | ||
|
||
如:消息队列的结构体定义如下: | ||
|
||
```go | ||
type Payload struct { | ||
Group string `json:"group"` | ||
Uid string `json:"uid"` | ||
Payload []byte `json:"payload"` | ||
} | ||
``` | ||
|
||
实现自己的消息队列处理器,需要实现以下接口: | ||
|
||
```go | ||
type Queue interface { | ||
// 返回消息队列的主题 | ||
GetTopic() string | ||
|
||
// 处理消息队列的消息 | ||
Handle(ctx context.Context, p worker.Payload) (err error) | ||
} | ||
``` | ||
|
||
可以参考工程中实现的系统操作日志记录的处理过程的处理。位置在:`internal/queues/sysOperLog.go`。 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# 定时任务 | ||
|
||
SagooIoT基于Asynq实现了定时任务的处理。定时任务是指在指定的时间点执行任务。例如:每天定时备份数据、清理过期日志、数据同步等。 | ||
|
||
系统中统一在`internal/tasks`目录下实现定时任务的处理器。实现起来很简单,只需要实现处理的方法并添加列表即可。 | ||
|
||
如添加一个访问指定URL的定时任务处理器: | ||
|
||
1,实现任务处理的方法 | ||
|
||
```go | ||
|
||
// GetAccessURL 执行访问URL | ||
func (t TaskJob) GetAccessURL(accessURL string) { | ||
ctx := context.Background() | ||
g.Log().Debug(ctx, "访问URL:", accessURL) | ||
res, err := g.Client().Get(ctx, accessURL) | ||
if err != nil { | ||
g.Log().Error(ctx, err) | ||
} | ||
defer func(res *gclient.Response) { | ||
if err := res.Close(); err != nil { | ||
g.Log().Error(ctx, err) | ||
} | ||
}(res) | ||
} | ||
|
||
``` | ||
:::warning 注意 | ||
方法必须是公开的方法,方法的参数可以是任意类型,但是必须是一个参数。必须基于TaskJob结构体实现。 | ||
::: | ||
|
||
|
||
2,在`internal/tasks`目录下base.go中添加任务处理器列表 | ||
|
||
```go | ||
|
||
func (t TaskJob) GetFuncNameList() (res map[string]string) { | ||
res = map[string]string{ | ||
"GetAccessURL": "访问URL", // 添加访问指定URL定时任务处理方法的描述 | ||
} | ||
return | ||
} | ||
|
||
``` | ||
|
||
:::warning 注意 | ||
res的key是方法名,value是方法的描述。 | ||
|
||
::: |