Skip to content

Commit

Permalink
Make [rR]edisBackend public
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Jul 12, 2023
1 parent 49d5539 commit 49317f4
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,55 @@ var (
ErrInvalidAddr = errors.New("invalid connecton string: see documentation for valid connection strings")
)

// redisBackend is a Redis-backed neoq backend
type redisBackend struct {
// RedisBackend is a Redis-backed neoq backend
// nolint: revive
type RedisBackend struct {
types.Backend
client *asynq.Client
server *asynq.Server
mux *asynq.ServeMux
config *config.Config
logger logging.Logger
mu *sync.Mutex // mutext to protect mutating backend state
taskProvider *MemoryTaskConfigProvider
taskProvider *memoryTaskConfigProvider
mgr *asynq.PeriodicTaskManager
}

type MemoryTaskConfigProvider struct {
type memoryTaskConfigProvider struct {
mu *sync.Mutex
configs []*asynq.PeriodicTaskConfig
}

// NewMemoryTaskConfigProvider returns a new asynq MemoryTaskConfigProvider
func NewMemoryTaskConfigProvider() (p *MemoryTaskConfigProvider) {
p = &MemoryTaskConfigProvider{
// newMemoryTaskConfigProvider returns a new asynq MemoryTaskConfigProvider
func newMemoryTaskConfigProvider() (p *memoryTaskConfigProvider) {
p = &memoryTaskConfigProvider{
mu: &sync.Mutex{},
configs: []*asynq.PeriodicTaskConfig{},
}
return
}

// GetConfigs returns this provider's periodic task configurations
func (m *MemoryTaskConfigProvider) GetConfigs() (c []*asynq.PeriodicTaskConfig, err error) {
func (m *memoryTaskConfigProvider) GetConfigs() (c []*asynq.PeriodicTaskConfig, err error) {
m.mu.Lock()
cfgs := m.configs
m.mu.Unlock()
return cfgs, nil
}

// addConfig adds a periodic task configuration to this provider's configs
func (m *MemoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfig) {
func (m *memoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfig) {
m.mu.Lock()
m.configs = append(m.configs, taskConfig)
m.mu.Unlock()
}

// Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend
func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) {
b := &redisBackend{
b := &RedisBackend{
config: config.New(),
mu: &sync.Mutex{},
taskProvider: NewMemoryTaskConfigProvider(),
taskProvider: newMemoryTaskConfigProvider(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -176,7 +177,7 @@ func WithShutdownTimeout(timeout time.Duration) config.Option {
}

// Enqueue queues jobs to be executed asynchronously
func (b *redisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
Expand Down Expand Up @@ -207,7 +208,7 @@ func (b *redisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string
}

// Start starts processing jobs with the specified queue and handler
func (b *redisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) {
func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) {
b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) {
var p map[string]any
if err = json.Unmarshal(t.Payload(), &p); err != nil {
Expand Down Expand Up @@ -235,7 +236,7 @@ func (b *redisBackend) Start(_ context.Context, queue string, h handler.Handler)
// StartCron starts processing jobs with the specified cron schedule and handler
//
// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format
func (b *redisBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) {
func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) {
cd, err := crondescriptor.NewCronDescriptor(cronSpec)
if err != nil {
return fmt.Errorf("error creating cron descriptor: %w", err)
Expand Down Expand Up @@ -297,12 +298,12 @@ func toAsynqCronspec(cronSpec string) string {
}

// SetLogger sets this backend's logger
func (b *redisBackend) SetLogger(logger logging.Logger) {
func (b *RedisBackend) SetLogger(logger logging.Logger) {
b.logger = logger
}

// Shutdown halts the worker
func (b *redisBackend) Shutdown(ctx context.Context) {
func (b *RedisBackend) Shutdown(ctx context.Context) {
b.client.Close()
b.server.Shutdown()
}
Expand Down

0 comments on commit 49317f4

Please sign in to comment.