From 6d53919e37d3194866f9dece57c20c46ccc37100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 21 Dec 2023 11:48:24 +0800 Subject: [PATCH] feat(crontab): Added crontab (#40) --- crontab/README.md | 51 ++++++++++++++ crontab/mutex.go | 15 ++++ crontab/mutex/redis/mutex.go | 77 ++++++++++++++++++++ crontab/server.go | 133 +++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 6 files changed, 279 insertions(+) create mode 100644 crontab/README.md create mode 100644 crontab/mutex.go create mode 100644 crontab/mutex/redis/mutex.go create mode 100644 crontab/server.go diff --git a/crontab/README.md b/crontab/README.md new file mode 100644 index 0000000..f1a0cdc --- /dev/null +++ b/crontab/README.md @@ -0,0 +1,51 @@ +# Crontab + +## Usage + +```go +package main + +import ( + "fmt" + + "github.com/go-kratos/kratos/v2" + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" + + "pusher/internal/pkg/server/crontab" + redisMutex "pusher/internal/pkg/server/crontab/mutex/redis" +) + +func main() { + rdb := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + + if err := kratos.New( + kratos.Server( + NewCrontabServer(rdb), + ), + ).Run(); err != nil { + panic(err) + } +} + +func NewCrontabServer(rdb redis.Cmdable) *crontab.Server { + c := cron.New( + cron.WithSeconds(), + ) + + c.AddFunc("*/1 * * * * *", func() { + fmt.Println("Every hour on the half hour") + }) + + return crontab.NewServer( + c, + crontab.WithName("crontab:server"), + crontab.WithDebug(), + crontab.WithMutex( + redisMutex.New(rdb), + ), + ) +} +``` \ No newline at end of file diff --git a/crontab/mutex.go b/crontab/mutex.go new file mode 100644 index 0000000..cb17262 --- /dev/null +++ b/crontab/mutex.go @@ -0,0 +1,15 @@ +package crontab + +import ( + "context" + "errors" +) + +var ( + ErrAnotherServerRunning = errors.New("crontab: another server running") +) + +type Mutex interface { + Lock(ctx context.Context, name string) error + Unlock(ctx context.Context, name string) error +} diff --git a/crontab/mutex/redis/mutex.go b/crontab/mutex/redis/mutex.go new file mode 100644 index 0000000..e8bee48 --- /dev/null +++ b/crontab/mutex/redis/mutex.go @@ -0,0 +1,77 @@ +package redis + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + + "github.com/go-packagist/go-kratos-components/crontab" +) + +type Mutex struct { + redis.Cmdable + expired time.Duration + serverid string +} + +var _ crontab.Mutex = (*Mutex)(nil) + +type Option func(*Mutex) + +func (m *Mutex) WithExpired(expired time.Duration) Option { + return func(m *Mutex) { + m.expired = expired + } +} + +func New(redis redis.Cmdable, opts ...Option) *Mutex { + m := &Mutex{ + Cmdable: redis, + expired: time.Second * 60, + serverid: uuid.New().String(), + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +func (m *Mutex) Lock(ctx context.Context, name string) error { + if result := m.SetNX(ctx, name, m.serverid, m.expired); result.Err() != nil { + return result.Err() + } else if !result.Val() { + if val, err := m.get(ctx, name); err != nil { + return err + } else if val == m.serverid { + if err := m.refresh(ctx, name); err != nil { + return err + } + + return nil + } + + return crontab.ErrAnotherServerRunning + } + + return nil +} + +func (m *Mutex) get(ctx context.Context, name string) (string, error) { + return m.Get(ctx, name).Result() +} + +func (m *Mutex) refresh(ctx context.Context, name string) error { + return m.Expire(ctx, name, m.expired).Err() +} + +func (m *Mutex) Unlock(ctx context.Context, name string) error { + if result := m.Del(ctx, name); result.Err() != nil { + return result.Err() + } + + return nil +} diff --git a/crontab/server.go b/crontab/server.go new file mode 100644 index 0000000..ad096f6 --- /dev/null +++ b/crontab/server.go @@ -0,0 +1,133 @@ +package crontab + +import ( + "context" + "log" + "sync" + "time" + + "github.com/go-kratos/kratos/v2/transport" + "github.com/robfig/cron/v3" +) + +var _ transport.Server = (*Server)(nil) + +type Server struct { + cron *cron.Cron + + name string + mutex Mutex + + running bool + runningMu sync.Mutex + + stoped chan struct{} + + debug bool +} + +type Option func(*Server) + +func WithName(name string) Option { + return func(s *Server) { + s.name = name + } +} + +func WithMutex(m Mutex) Option { + return func(s *Server) { + s.mutex = m + } +} + +func WithDebug() Option { + return func(s *Server) { + s.debug = true + } +} + +func NewServer(c *cron.Cron, opts ...Option) *Server { + s := &Server{ + cron: c, + name: "cron:server", + stoped: make(chan struct{}), + } + + for _, opt := range opts { + opt(s) + } + + if s.mutex == nil { + panic("crontab: mutex is nil") + } + + return s +} + +func (s *Server) Start(ctx context.Context) error { + go s.run(ctx) + + return nil +} + +func (s *Server) run(ctx context.Context) { + timer := time.NewTicker(time.Second) + defer func() { + _ = s.mutex.Unlock(ctx, s.name) + timer.Stop() + }() + + for { + select { + case <-ctx.Done(): + s.log("crontab: server done") + return + case <-s.stoped: + s.log("crontab: server stoped") + return + case <-timer.C: + if err := s.mutex.Lock(ctx, s.name); err != nil { + s.log(err) + continue + } + + s.start(ctx) + } + } +} + +func (s *Server) start(ctx context.Context) { + s.runningMu.Lock() + defer s.runningMu.Unlock() + + if s.running { + return + } + + s.running = true + s.cron.Start() + + s.log("crontab: server started") +} + +func (s *Server) Stop(ctx context.Context) error { + s.runningMu.Lock() + defer s.runningMu.Unlock() + + if !s.running { + return nil + } + + s.running = false + s.cron.Stop() + + close(s.stoped) + + return s.mutex.Unlock(ctx, s.name) +} + +func (s *Server) log(v ...interface{}) { + if s.debug { + log.Println(v...) + } +} diff --git a/go.mod b/go.mod index dcdce68..4098ea7 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-kratos/kratos/v2 v2.7.2 github.com/google/uuid v1.5.0 github.com/redis/go-redis/v9 v9.3.1 + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.8.4 gorm.io/gorm v1.25.5 ) diff --git a/go.sum b/go.sum index 8b09952..1326e39 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=