From 0425f65e20d49b7fe7deba588631bf24becc6cbc Mon Sep 17 00:00:00 2001 From: microrain Date: Wed, 21 Feb 2024 08:43:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9E=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=98=9F=E5=88=97=E7=9A=84=E5=BC=80=E5=8F=91=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- develop/task/_category_.json | 7 +++++ develop/task/intro.md | 36 ++++++++++++++++++++++++++ develop/task/queue.md | 28 ++++++++++++++++++++ develop/task/timed.md | 50 ++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+) create mode 100644 develop/task/_category_.json create mode 100644 develop/task/intro.md create mode 100644 develop/task/queue.md create mode 100644 develop/task/timed.md diff --git a/develop/task/_category_.json b/develop/task/_category_.json new file mode 100644 index 0000000..c299369 --- /dev/null +++ b/develop/task/_category_.json @@ -0,0 +1,7 @@ +{ + "label": "任务队列", + "position": 3, + "link": { + "type": "generated-index" + } +} diff --git a/develop/task/intro.md b/develop/task/intro.md new file mode 100644 index 0000000..a96b9db --- /dev/null +++ b/develop/task/intro.md @@ -0,0 +1,36 @@ +# 概要说明 + +SagooIoT系统支持任务队列(Task Queue)处理。任务队列一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。 +任务队列的输入是称为任务(Task)的工作单元。专用的工作进程不断监视任务队列以查找要执行的新工作。 + +## 实现说明 +SagooIoT的任务队列基于Asynq实现,支持分布式任务,支持定时任务,支持后台任务,支持解耦任务,支持实时处理任务。 +Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。 +Asynq主要由以下几个组件组成: +* 任务(Task):需要被异步执行的操作; +* 处理器(Processor):负责执行任务的工作进程; +* 队列(Queue):存放待执行任务的队列; +* 调度器(Scheduler):根据规则将任务分配给不同的处理器进行执行。 + +关于Asynq的更多信息,请参考[Asynq](https://www.tizi365.com/topic/14001.html)。 + +## 功能特点 +* 保证至少执行一次任务 +* 任务写入Redis后可以持久化 +* 任务失败之后,会自动重试 +* worker崩溃自动恢复 +* 可是实现任务的优先级 +* 任务可以进行编排 +* 任务可以设定执行时间或者最长可执行的时间a +* 支持中间件 +* 可以使用 unique-option 来避免任务重复执行,实现唯一性 +* 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性 + +## 应用场景 + +* 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。 +* 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式 +* 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。 +* 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。 +* 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。 + diff --git a/develop/task/queue.md b/develop/task/queue.md new file mode 100644 index 0000000..edceb13 --- /dev/null +++ b/develop/task/queue.md @@ -0,0 +1,28 @@ +# 消息队列 + +在SagooIoT中实现消息队列的处理方式很简单,只需要实现一个消息队列处理器即可。 +系统中定义了消息队列的实现接口,可以通过消息队列实现进程间通信。消息队列是消息的链表,每个消息队列都有一个唯一的标识符,用于标识消息队列。消息队列的消息是一个结构体,包含消息分组、唯一ID、消息数据部分。 + +如:消息队列的结构体定义如下: + +```go +type Payload struct { + Group string `json:"group"` + Uid string `json:"uid"` + Payload []byte `json:"payload"` +} +``` + +实现自己的消息队列处理器,需要实现以下接口: + +```go +type Queue interface { + // 返回消息队列的主题 + GetTopic() string + + // 处理消息队列的消息 + Handle(ctx context.Context, p worker.Payload) (err error) +} +``` + +可以参考工程中实现的系统操作日志记录的处理过程的处理。位置在:`internal/queues/sysOperLog.go`。 diff --git a/develop/task/timed.md b/develop/task/timed.md new file mode 100644 index 0000000..be435e9 --- /dev/null +++ b/develop/task/timed.md @@ -0,0 +1,50 @@ +# 定时任务 + +SagooIoT基于Asynq实现了定时任务的处理。定时任务是指在指定的时间点执行任务。例如:每天定时备份数据、清理过期日志、数据同步等。 + +系统中统一在`internal/tasks`目录下实现定时任务的处理器。实现起来很简单,只需要实现处理的方法并添加列表即可。 + +如添加一个访问指定URL的定时任务处理器: + +1,实现任务处理的方法 + +```go + +// GetAccessURL 执行访问URL +func (t TaskJob) GetAccessURL(accessURL string) { + ctx := context.Background() + g.Log().Debug(ctx, "访问URL:", accessURL) + res, err := g.Client().Get(ctx, accessURL) + if err != nil { + g.Log().Error(ctx, err) + } + defer func(res *gclient.Response) { + if err := res.Close(); err != nil { + g.Log().Error(ctx, err) + } + }(res) +} + +``` +:::warning 注意 +方法必须是公开的方法,方法的参数可以是任意类型,但是必须是一个参数。必须基于TaskJob结构体实现。 +::: + + +2,在`internal/tasks`目录下base.go中添加任务处理器列表 + +```go + +func (t TaskJob) GetFuncNameList() (res map[string]string) { + res = map[string]string{ + "GetAccessURL": "访问URL", // 添加访问指定URL定时任务处理方法的描述 + } + return +} + +``` + +:::warning 注意 +res的key是方法名,value是方法的描述。 + +:::