Skip to content

Commit

Permalink
feat(crontab): Added crontab (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
flc1125 authored Dec 21, 2023
1 parent 2520e53 commit 6d53919
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 0 deletions.
51 changes: 51 additions & 0 deletions crontab/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Crontab

## Usage

```go
package main

import (
"fmt"

"github.com/go-kratos/kratos/v2"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"

"pusher/internal/pkg/server/crontab"
redisMutex "pusher/internal/pkg/server/crontab/mutex/redis"
)

func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})

if err := kratos.New(
kratos.Server(
NewCrontabServer(rdb),
),
).Run(); err != nil {
panic(err)
}
}

func NewCrontabServer(rdb redis.Cmdable) *crontab.Server {
c := cron.New(
cron.WithSeconds(),
)

c.AddFunc("*/1 * * * * *", func() {
fmt.Println("Every hour on the half hour")
})

return crontab.NewServer(
c,
crontab.WithName("crontab:server"),
crontab.WithDebug(),
crontab.WithMutex(
redisMutex.New(rdb),
),
)
}
```
15 changes: 15 additions & 0 deletions crontab/mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package crontab

import (
"context"
"errors"
)

var (
ErrAnotherServerRunning = errors.New("crontab: another server running")
)

type Mutex interface {
Lock(ctx context.Context, name string) error
Unlock(ctx context.Context, name string) error
}
77 changes: 77 additions & 0 deletions crontab/mutex/redis/mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package redis

import (
"context"
"time"

"github.com/google/uuid"
"github.com/redis/go-redis/v9"

"github.com/go-packagist/go-kratos-components/crontab"
)

type Mutex struct {
redis.Cmdable
expired time.Duration
serverid string
}

var _ crontab.Mutex = (*Mutex)(nil)

type Option func(*Mutex)

func (m *Mutex) WithExpired(expired time.Duration) Option {
return func(m *Mutex) {
m.expired = expired
}
}

func New(redis redis.Cmdable, opts ...Option) *Mutex {
m := &Mutex{
Cmdable: redis,
expired: time.Second * 60,
serverid: uuid.New().String(),
}

for _, opt := range opts {
opt(m)
}

return m
}

func (m *Mutex) Lock(ctx context.Context, name string) error {
if result := m.SetNX(ctx, name, m.serverid, m.expired); result.Err() != nil {
return result.Err()
} else if !result.Val() {
if val, err := m.get(ctx, name); err != nil {
return err
} else if val == m.serverid {
if err := m.refresh(ctx, name); err != nil {
return err
}

return nil
}

return crontab.ErrAnotherServerRunning
}

return nil
}

func (m *Mutex) get(ctx context.Context, name string) (string, error) {
return m.Get(ctx, name).Result()
}

func (m *Mutex) refresh(ctx context.Context, name string) error {
return m.Expire(ctx, name, m.expired).Err()
}

func (m *Mutex) Unlock(ctx context.Context, name string) error {
if result := m.Del(ctx, name); result.Err() != nil {
return result.Err()
}

return nil
}
133 changes: 133 additions & 0 deletions crontab/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package crontab

import (
"context"
"log"
"sync"
"time"

"github.com/go-kratos/kratos/v2/transport"
"github.com/robfig/cron/v3"
)

var _ transport.Server = (*Server)(nil)

type Server struct {
cron *cron.Cron

name string
mutex Mutex

running bool
runningMu sync.Mutex

stoped chan struct{}

debug bool
}

type Option func(*Server)

func WithName(name string) Option {
return func(s *Server) {
s.name = name
}
}

func WithMutex(m Mutex) Option {
return func(s *Server) {
s.mutex = m
}
}

func WithDebug() Option {
return func(s *Server) {
s.debug = true
}
}

func NewServer(c *cron.Cron, opts ...Option) *Server {
s := &Server{
cron: c,
name: "cron:server",
stoped: make(chan struct{}),
}

for _, opt := range opts {
opt(s)
}

if s.mutex == nil {
panic("crontab: mutex is nil")
}

return s
}

func (s *Server) Start(ctx context.Context) error {
go s.run(ctx)

return nil
}

func (s *Server) run(ctx context.Context) {
timer := time.NewTicker(time.Second)
defer func() {
_ = s.mutex.Unlock(ctx, s.name)
timer.Stop()
}()

for {
select {
case <-ctx.Done():
s.log("crontab: server done")
return
case <-s.stoped:
s.log("crontab: server stoped")
return
case <-timer.C:
if err := s.mutex.Lock(ctx, s.name); err != nil {
s.log(err)
continue
}

s.start(ctx)
}
}
}

func (s *Server) start(ctx context.Context) {
s.runningMu.Lock()
defer s.runningMu.Unlock()

if s.running {
return
}

s.running = true
s.cron.Start()

s.log("crontab: server started")
}

func (s *Server) Stop(ctx context.Context) error {
s.runningMu.Lock()
defer s.runningMu.Unlock()

if !s.running {
return nil
}

s.running = false
s.cron.Stop()

close(s.stoped)

return s.mutex.Unlock(ctx, s.name)
}

func (s *Server) log(v ...interface{}) {
if s.debug {
log.Println(v...)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-kratos/kratos/v2 v2.7.2
github.com/google/uuid v1.5.0
github.com/redis/go-redis/v9 v9.3.1
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.4
gorm.io/gorm v1.25.5
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
Expand Down

0 comments on commit 6d53919

Please sign in to comment.