From b5083b6f6331cc60cb67936149c5ce6ae7d6a1db Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Tue, 19 Dec 2023 20:07:19 -0800 Subject: [PATCH] Move jobs to mw --- go.mod | 7 +- go.sum | 12 +- jobs/jobs.go | 59 ---------- jobs/jobs_test.go | 21 ---- jobs/local_jobs.go | 147 ------------------------ jobs/local_test.go | 81 ------------- jobs/log.go | 31 ----- jobs/redis_jobs.go | 185 ------------------------------ jobs/redis_test.go | 91 --------------- model/config.go | 2 + server/server_cmd.go | 21 ++-- workers/fetch_enqueue_worker.go | 5 +- workers/gbfs_fetch_worker.go | 4 +- workers/gbfs_fetch_worker_test.go | 2 +- workers/rt_fetch_worker.go | 4 +- workers/static_fetch_worker.go | 4 +- workers/test_worker.go | 6 +- workers/workers.go | 22 ++-- 18 files changed, 46 insertions(+), 658 deletions(-) delete mode 100644 jobs/jobs.go delete mode 100644 jobs/jobs_test.go delete mode 100644 jobs/local_jobs.go delete mode 100644 jobs/local_test.go delete mode 100644 jobs/log.go delete mode 100644 jobs/redis_jobs.go delete mode 100644 jobs/redis_test.go diff --git a/go.mod b/go.mod index 0112e132..0cd39084 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/aws/aws-sdk-go-v2 v1.17.5 github.com/aws/aws-sdk-go-v2/config v1.18.15 github.com/aws/aws-sdk-go-v2/service/location v1.22.1 - github.com/digitalocean/go-workers2 v0.10.3 github.com/flopp/go-staticmaps v0.0.0-20220221183018-c226716bec53 github.com/go-chi/chi v1.5.4 github.com/go-chi/chi/v5 v5.0.10 @@ -21,8 +20,8 @@ require ( github.com/graph-gophers/dataloader/v7 v7.1.0 github.com/hypirion/go-filecache v0.0.0-20160810125507-e3e6ef6981f0 github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2 - github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7 - github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63 + github.com/interline-io/transitland-lib v0.14.0 + github.com/interline-io/transitland-mw v0.0.0-20231220040358-e71272cbbbdb github.com/jellydator/ttlcache/v2 v2.11.1 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.7 @@ -69,6 +68,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/digitalocean/go-workers2 v0.10.4 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/flopp/go-coordsparser v0.0.0-20201115094714-8baaeb7062d5 // indirect github.com/fogleman/gg v1.3.0 // indirect @@ -125,3 +125,4 @@ require ( ) // replace github.com/interline-io/transitland-lib => /Users/irees/src/interline-io/transitland-lib +// replace github.com/interline-io/transitland-mw => /Users/irees/src/interline-io/transitland-mw diff --git a/go.sum b/go.sum index 01ebaec4..f2c1b109 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= -github.com/digitalocean/go-workers2 v0.10.3 h1:qs7MELMMQybjSxmvFqvedXJAqkF7f5vC+QnVa1dmUwA= -github.com/digitalocean/go-workers2 v0.10.3/go.mod h1:4F0tUdjgqV9ZOGCTmn5Cr58CVshMI5o0SmNx6e5T98o= +github.com/digitalocean/go-workers2 v0.10.4 h1:3GTz14m2gaPPzUEEXBLLK0Y/jyWTYi5QKcSbrCbyIJQ= +github.com/digitalocean/go-workers2 v0.10.4/go.mod h1:4F0tUdjgqV9ZOGCTmn5Cr58CVshMI5o0SmNx6e5T98o= github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= @@ -270,10 +270,10 @@ github.com/iancoleman/orderedmap v0.2.0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2 h1:ScRM8Kr6UwAvblyGMdupRwEy7eWCROPLWmGJf1J1aOk= github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2/go.mod h1:chJaM8SKcHI6ivoeFuZ8M8axTjSV4TPmuQ+sAyAHa34= -github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7 h1:rwkKzYzl05Q4TM++L9RIJbXPjIvURoMN5vEr8dvUV/Q= -github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7/go.mod h1:UcfuCX6DyKt/yn5GECFn3jQ6NcZEjt5XyPjf8a3tXZ4= -github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63 h1:ZaqjWLLHAUcg6giXwdnDvzqlQJxYavG5JRdfm+zvZ3c= -github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63/go.mod h1:4rAXsCPb7miaAmLOoKjWZSBVCUBiTU5KdisG8orDyYQ= +github.com/interline-io/transitland-lib v0.14.0 h1:kjUIk+2BtzBY2yafXoRQ4MMcatHydDg6HOtwbzF3mEA= +github.com/interline-io/transitland-lib v0.14.0/go.mod h1:TwGmZjX/iJRaYWKKosU/TIn8Dt4dywYCAXqmZwAA6qo= +github.com/interline-io/transitland-mw v0.0.0-20231220040358-e71272cbbbdb h1:gt3chseddTsXkMzJEQstFlIAwwxU7IpeKRm4mViN/5s= +github.com/interline-io/transitland-mw v0.0.0-20231220040358-e71272cbbbdb/go.mod h1:QyssKyKXwbTWhFT9O08nVj+gScPntMvLh1qMGNr1Ksw= github.com/jarcoal/httpmock v1.3.1 h1:iUx3whfZWVf3jT01hQTO/Eo5sAYtB2/rqaUuOtpInww= github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= diff --git a/jobs/jobs.go b/jobs/jobs.go deleted file mode 100644 index 4f302dda..00000000 --- a/jobs/jobs.go +++ /dev/null @@ -1,59 +0,0 @@ -package jobs - -import ( - "context" - "crypto/sha1" - "encoding/hex" - "encoding/json" - - "github.com/interline-io/transitland-lib/tl" - "github.com/rs/zerolog" -) - -type JobArgs map[string]interface{} - -// Job queue -type JobQueue interface { - AddJob(Job) error - AddWorker(string, GetWorker, JobOptions, int) error - Use(JobMiddleware) - Run() error - Stop() error -} - -// Job defines a single job -type Job struct { - Queue string `json:"queue"` - JobType string `json:"job_type"` - JobArgs JobArgs `json:"job_args"` - Unique bool `json:"unique"` - JobDeadline int64 `json:"job_deadline"` - Opts JobOptions `json:"-"` - jobId string `json:"-"` -} - -func (job *Job) HexKey() (string, error) { - bytes, err := json.Marshal(job.JobArgs) - if err != nil { - return "", err - } - sum := sha1.Sum(bytes) - return job.JobType + ":" + hex.EncodeToString(sum[:]), nil -} - -// JobOptions is configuration passed to worker. -type JobOptions struct { - JobQueue JobQueue - Logger zerolog.Logger - Secrets []tl.Secret -} - -// GetWorker returns a new worker for this job type -type GetWorker func(Job) (JobWorker, error) - -// JobWorker defines a job worker -type JobWorker interface { - Run(context.Context, Job) error -} - -type JobMiddleware func(JobWorker) JobWorker diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go deleted file mode 100644 index 6861d840..00000000 --- a/jobs/jobs_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package jobs - -import ( - "context" - "sync/atomic" - "time" -) - -var ( - feeds = []string{"BA", "SF", "AC", "CT"} -) - -type testWorker struct { - count *int64 -} - -func (t *testWorker) Run(ctx context.Context, job Job) error { - time.Sleep(10 * time.Millisecond) - atomic.AddInt64(t.count, 1) - return nil -} diff --git a/jobs/local_jobs.go b/jobs/local_jobs.go deleted file mode 100644 index 3291ddc5..00000000 --- a/jobs/local_jobs.go +++ /dev/null @@ -1,147 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/interline-io/log" -) - -var jobCounter = uint64(0) - -type LocalJobs struct { - jobs chan Job - jobfuncs []func(Job) error - running bool - middlewares []JobMiddleware - uniqueJobs map[string]bool - uniqueJobsLock sync.Mutex -} - -func NewLocalJobs() *LocalJobs { - f := &LocalJobs{ - jobs: make(chan Job, 1000), - uniqueJobs: map[string]bool{}, - } - f.middlewares = append(f.middlewares, newLog()) - return f -} - -func (f *LocalJobs) Use(mwf JobMiddleware) { - f.middlewares = append(f.middlewares, mwf) -} - -func (f *LocalJobs) AddJob(job Job) error { - if f.jobs == nil { - return errors.New("closed") - } - if job.Unique { - f.uniqueJobsLock.Lock() - defer f.uniqueJobsLock.Unlock() - key, err := job.HexKey() - if err != nil { - return err - } - if _, ok := f.uniqueJobs[key]; ok { - log.Trace().Interface("job", job).Msgf("already locked: %s", key) - return nil - } else { - f.uniqueJobs[key] = true - log.Trace().Interface("job", job).Msgf("locked: %s", key) - } - } - f.jobs <- job - log.Info().Interface("job", job).Msg("jobs: added job") - return nil -} - -func (f *LocalJobs) processMessage(getWorker GetWorker, jo JobOptions, job Job) error { - job = Job{ - JobType: job.JobType, - JobArgs: job.JobArgs, - JobDeadline: job.JobDeadline, - Unique: job.Unique, - Opts: jo, - jobId: fmt.Sprintf("%d", atomic.AddUint64(&jobCounter, 1)), - } - now := time.Now().In(time.UTC).Unix() - if job.JobDeadline > 0 && job.JobDeadline < now { - log.Trace().Int64("job_deadline", job.JobDeadline).Int64("now", now).Msg("job skipped - deadline in past") - return nil - } - if job.Unique { - f.uniqueJobsLock.Lock() - defer f.uniqueJobsLock.Unlock() - key, err := job.HexKey() - if err != nil { - return err - } - delete(f.uniqueJobs, key) - log.Trace().Interface("job", job).Msgf("unlocked: %s", key) - } - w, err := getWorker(job) - if err != nil { - return err - } - if w == nil { - return errors.New("no job") - } - for _, mwf := range f.middlewares { - w = mwf(w) - if w == nil { - return errors.New("no job") - } - } - return w.Run(context.TODO(), job) -} - -func (f *LocalJobs) AddWorker(queue string, getWorker GetWorker, jo JobOptions, count int) error { - if f.running { - return errors.New("already running") - } - processMessage := func(job Job) error { - return f.processMessage(getWorker, jo, job) - } - log.Infof("jobs: created job listener") - for i := 0; i < count; i++ { - f.jobfuncs = append(f.jobfuncs, processMessage) - } - return nil -} - -func (f *LocalJobs) Run() error { - if f.running { - return errors.New("already running") - } - log.Infof("jobs: running") - f.running = true - var wg sync.WaitGroup - for _, jobfunc := range f.jobfuncs { - wg.Add(1) - go func(jf func(Job) error, w *sync.WaitGroup) { - for job := range f.jobs { - if err := jf(job); err != nil { - log.Trace().Err(err).Msg("job failed") - } - } - wg.Done() - }(jobfunc, &wg) - } - wg.Wait() - return nil -} - -func (f *LocalJobs) Stop() error { - if !f.running { - return errors.New("not running") - } - log.Infof("jobs: stopping") - close(f.jobs) - f.running = false - f.jobs = nil - return nil -} diff --git a/jobs/local_test.go b/jobs/local_test.go deleted file mode 100644 index dc89f8bb..00000000 --- a/jobs/local_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package jobs - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestLocalJobs(t *testing.T) { - t.Run("simple", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - for _, feed := range feeds { - rtJobs.AddJob(Job{JobType: "test", Unique: false, JobArgs: JobArgs{"feed_id": feed}}) - } - go func() { - time.Sleep(200 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, len(feeds), int(count)) - }) - t.Run("unique", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - for i := 0; i < 10; i++ { - // 1 job: j=0 - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/10)}} - rtJobs.AddJob(job) - } - // 3 jobs; j=3, j=6, j=9... j=0 is not unique - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/3)}} - rtJobs.AddJob(job) - } - // 10 jobs: j=0, j=0, j=2, j=2, j=4, j=4, j=6 j=6, j=8, j=8 - for j := 0; j < 10; j++ { - job := Job{JobType: "testNotUnique", Unique: false, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/2)}} - rtJobs.AddJob(job) - } - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 4) - go func() { - time.Sleep(1000 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(104), count) - }) - t.Run("deadline", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: 0}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(1 * time.Hour).Unix()}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(-1 * time.Hour).Unix()}) - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - go func() { - time.Sleep(100 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(2), count) - }) - -} diff --git a/jobs/log.go b/jobs/log.go deleted file mode 100644 index c5391652..00000000 --- a/jobs/log.go +++ /dev/null @@ -1,31 +0,0 @@ -package jobs - -import ( - "context" - "time" - - "github.com/interline-io/log" -) - -type jlog struct { - JobWorker -} - -func (w *jlog) Run(ctx context.Context, job Job) error { - t1 := time.Now() - job.Opts.Logger = log.Logger.With().Str("job_type", job.JobType).Str("job_id", job.jobId).Logger() - job.Opts.Logger.Info().Msg("job: started") - if err := w.JobWorker.Run(ctx, job); err != nil { - job.Opts.Logger.Error().Err(err).Msg("job: error") - return err - } - job.Opts.Logger.Info().Int64("job_time_ms", (time.Now().UnixNano()-t1.UnixNano())/1e6).Msg("job: completed") - return nil - -} - -func newLog() JobMiddleware { - return func(w JobWorker) JobWorker { - return &jlog{JobWorker: w} - } -} diff --git a/jobs/redis_jobs.go b/jobs/redis_jobs.go deleted file mode 100644 index 59743041..00000000 --- a/jobs/redis_jobs.go +++ /dev/null @@ -1,185 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "fmt" - "os" - "strconv" - "time" - - workers "github.com/digitalocean/go-workers2" - "github.com/go-redis/redis/v8" - - "github.com/interline-io/log" -) - -// RedisJobs is a simple wrapper around go-workers -type RedisJobs struct { - queuePrefix string - producer *workers.Producer - manager *workers.Manager - client *redis.Client - middlewares []JobMiddleware -} - -func NewRedisJobs(client *redis.Client, queuePrefix string) *RedisJobs { - f := RedisJobs{ - queuePrefix: queuePrefix, - client: client, - } - f.Use(newLog()) - return &f -} - -type redisJob struct { - JobType string `json:"job_type"` - JobArgs JobArgs `json:"job_args"` - JobDeadline int64 `json:"job_deadline"` - Unique bool `json:"unique"` -} - -func (f *RedisJobs) Use(mwf JobMiddleware) { - f.middlewares = append(f.middlewares, mwf) -} - -func (f *RedisJobs) AddJob(job Job) error { - if f.producer == nil { - var err error - f.producer, err = workers.NewProducerWithRedisClient(workers.Options{ - ProcessID: strconv.Itoa(os.Getpid()), - }, f.client) - if err != nil { - return err - } - } - if job.Unique { - key, err := job.HexKey() - if err != nil { - return err - } - fullKey := fmt.Sprintf("queue:%s:unique:%s", f.queueName(job.Queue), key) - deadlineTtl := time.Duration(60*60) * time.Second - if sec := job.JobDeadline - time.Now().In(time.UTC).Unix(); sec > 0 { - deadlineTtl = time.Duration(sec) * time.Second - } - logMsg := log.Trace().Interface("job", job).Str("key", fullKey).Float64("ttl", deadlineTtl.Seconds()) - if !f.client.SetNX(context.Background(), fullKey, "unique", deadlineTtl).Val() { - logMsg.Msg("unique job already locked") - return nil - } else { - logMsg.Msg("unique job locked") - } - } - rjob := redisJob{ - JobType: job.JobType, - JobArgs: job.JobArgs, - Unique: job.Unique, - JobDeadline: job.JobDeadline, - } - _, err := f.producer.Enqueue(f.queueName(job.Queue), rjob.JobType, rjob) - return err -} - -func (f *RedisJobs) queueName(q string) string { - if q == "" { - q = "default" - } - return f.queuePrefix + q -} - -func (f *RedisJobs) getManager() (*workers.Manager, error) { - var err error - if f.manager == nil { - f.manager, err = workers.NewManagerWithRedisClient(workers.Options{ - ProcessID: strconv.Itoa(os.Getpid()), - }, f.client) - } - return f.manager, err -} - -func (f *RedisJobs) processMessage(queueName string, getWorker GetWorker, jo JobOptions, msg *workers.Msg) error { - j := msg.Args() - job := Job{ - JobType: msg.Class(), - jobId: msg.Jid(), - Queue: queueName, - Opts: jo, - } - job.JobArgs, _ = j.Get("job_args").Map() - job.JobDeadline, _ = j.Get("job_deadline").Int64() - job.Unique, _ = j.Get("unique").Bool() - now := time.Now().In(time.UTC).Unix() - if job.Unique { - // Consider more advanced locking options - key, err := job.HexKey() - if err != nil { - return err - } - fullKey := fmt.Sprintf("queue:%s:unique:%s", f.queueName(job.Queue), key) - ctx := context.Background() - logMsg := log.Trace().Str("key", fullKey) - defer func() { - if result, err := f.client.Del(ctx, fullKey).Result(); err != nil { - logMsg.Err(err).Msg("error unlocking job!") - } else { - logMsg.Int64("result", result).Msg("unique job unlocked") - } - }() - } - if job.JobDeadline > 0 && now > job.JobDeadline { - log.Trace().Int64("job_deadline", job.JobDeadline).Int64("now", now).Msg("job skipped - deadline in past") - return nil - } - w, err := getWorker(job) - if err != nil { - return err - } - if w == nil { - return errors.New("no job") - } - for _, mwf := range f.middlewares { - w = mwf(w) - if w == nil { - return errors.New("no job") - } - } - if err := w.Run(context.TODO(), job); err != nil { - log.Trace().Err(err).Msg("job failed") - } - return nil -} - -func (f *RedisJobs) AddWorker(queue string, getWorker GetWorker, jo JobOptions, count int) error { - manager, err := f.getManager() - if err != nil { - return err - } - processMessage := func(msg *workers.Msg) error { - return f.processMessage(queue, getWorker, jo, msg) - } - if queue == "" { - queue = "default" - } - manager.AddWorker(f.queueName(queue), count, processMessage) - return nil -} - -func (f *RedisJobs) Run() error { - log.Infof("jobs: running") - manager, err := f.getManager() - if err == nil { - // Blocks - manager.Run() - } - return err -} - -func (f *RedisJobs) Stop() error { - log.Infof("jobs: stopping") - manager, err := f.getManager() - if err == nil { - manager.Stop() - } - return err -} diff --git a/jobs/redis_test.go b/jobs/redis_test.go deleted file mode 100644 index f9217d27..00000000 --- a/jobs/redis_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package jobs - -import ( - "fmt" - "os" - "testing" - "time" - - "github.com/interline-io/transitland-server/internal/testutil" - "github.com/stretchr/testify/assert" -) - -func TestRedisJobs(t *testing.T) { - // redis jobs and cache - if a, ok := testutil.CheckTestRedisClient(); !ok { - t.Skip(a) - return - } - client := testutil.MustOpenTestRedisClient() - - t.Run("simple", func(t *testing.T) { - // Ugly :( - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue1:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - for _, feed := range feeds { - rtJobs.AddJob(Job{JobType: "test", Unique: false, JobArgs: JobArgs{"feed_id": feed}}) - } - go func() { - time.Sleep(200 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, len(feeds), int(count)) - }) - t.Run("unique", func(t *testing.T) { - // Abuse the job queue - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue2:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - for i := 0; i < 10; i++ { - // 1 job: j=0 - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/10)}} - rtJobs.AddJob(job) - } - // 3 jobs; j=3, j=6, j=9... j=0 is not unique - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/3)}} - rtJobs.AddJob(job) - } - // 10 jobs: j=0, j=0, j=2, j=2, j=4, j=4, j=6 j=6, j=8, j=8 - for j := 0; j < 10; j++ { - job := Job{JobType: "testNotUnique", Unique: false, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/2)}} - rtJobs.AddJob(job) - } - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 4) - go func() { - time.Sleep(1000 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(104), count) - }) - t.Run("deadline", func(t *testing.T) { - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue3:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: 0}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(1 * time.Hour).Unix()}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(-1 * time.Hour).Unix()}) - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - go func() { - time.Sleep(100 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(2), count) - }) -} diff --git a/model/config.go b/model/config.go index c1b47eee..958b8683 100644 --- a/model/config.go +++ b/model/config.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/interline-io/transitland-lib/tl" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/clock" "github.com/rs/zerolog" ) @@ -19,6 +20,7 @@ type Config struct { ValidateLargeFiles bool Storage string RTStorage string + JobQueue jobs.JobQueue Logger zerolog.Logger } diff --git a/server/server_cmd.go b/server/server_cmd.go index 76e643b0..1494841a 100644 --- a/server/server_cmd.go +++ b/server/server_cmd.go @@ -25,6 +25,7 @@ import ( "github.com/interline-io/transitland-lib/tl" "github.com/interline-io/transitland-mw/auth/ancheck" "github.com/interline-io/transitland-mw/auth/azcheck" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-mw/lmw" "github.com/interline-io/transitland-mw/meters" "github.com/interline-io/transitland-mw/metrics" @@ -32,7 +33,6 @@ import ( "github.com/interline-io/transitland-server/finders/gbfsfinder" "github.com/interline-io/transitland-server/finders/rtfinder" "github.com/interline-io/transitland-server/internal/dbutil" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" "github.com/interline-io/transitland-server/server/gql" "github.com/interline-io/transitland-server/server/playground" @@ -230,6 +230,7 @@ func (cmd *Command) Run() error { Secrets: cmd.secrets, Storage: cmd.Storage, RTStorage: cmd.RTStorage, + JobQueue: jobQueue, ValidateLargeFiles: cmd.ValidateLargeFiles, } @@ -337,23 +338,23 @@ func (cmd *Command) Run() error { if cmd.EnableJobsApi || cmd.EnableWorkers { // Start workers/api jobWorkers := 8 - jobOptions := jobs.JobOptions{ - Logger: log.Logger, - JobQueue: jobQueue, - } + // jobOptions := jobs.JobOptions{ + // Logger: log.Logger, + // JobQueue: jobQueue, + // } // Add metrics // jobQueue.Use(metrics.NewJobMiddleware("", metricProvider.NewJobMetric("default"))) if cmd.EnableWorkers { log.Infof("Enabling job workers") - jobQueue.AddWorker("default", workers.GetWorker, jobOptions, jobWorkers) - jobQueue.AddWorker("rt-fetch", workers.GetWorker, jobOptions, jobWorkers) - jobQueue.AddWorker("static-fetch", workers.GetWorker, jobOptions, jobWorkers) - jobQueue.AddWorker("gbfs-fetch", workers.GetWorker, jobOptions, jobWorkers) + jobQueue.AddWorker("default", workers.GetWorker, jobWorkers) + jobQueue.AddWorker("rt-fetch", workers.GetWorker, jobWorkers) + jobQueue.AddWorker("static-fetch", workers.GetWorker, jobWorkers) + jobQueue.AddWorker("gbfs-fetch", workers.GetWorker, jobWorkers) go jobQueue.Run() } if cmd.EnableJobsApi { log.Infof("Enabling job api") - jobServer, err := workers.NewServer("", jobWorkers, jobOptions) + jobServer, err := workers.NewServer("", jobWorkers) if err != nil { return err } diff --git a/workers/fetch_enqueue_worker.go b/workers/fetch_enqueue_worker.go index 100a109f..df29d4a5 100644 --- a/workers/fetch_enqueue_worker.go +++ b/workers/fetch_enqueue_worker.go @@ -5,8 +5,8 @@ import ( "time" "github.com/interline-io/transitland-lib/tl" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" ) @@ -18,7 +18,6 @@ type FetchEnqueueWorker struct { func (w *FetchEnqueueWorker) Run(ctx context.Context, job jobs.Job) error { cfg := model.ForContext(ctx) db := cfg.Finder.DBX() - opts := job.Opts now := time.Now().In(time.UTC) feeds, err := cfg.Finder.FindFeeds(ctx, nil, nil, nil, &model.FeedFilter{}) if err != nil { @@ -149,7 +148,7 @@ func (w *FetchEnqueueWorker) Run(ctx context.Context, job jobs.Job) error { } for _, j := range jj { - if err := opts.JobQueue.AddJob(j); err != nil { + if err := job.JobQueue.AddJob(j); err != nil { return err } } diff --git a/workers/gbfs_fetch_worker.go b/workers/gbfs_fetch_worker.go index e1a473b0..3d3639db 100644 --- a/workers/gbfs_fetch_worker.go +++ b/workers/gbfs_fetch_worker.go @@ -7,8 +7,8 @@ import ( "time" "github.com/interline-io/transitland-lib/tldb" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/gbfs" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" ) @@ -20,7 +20,7 @@ type GbfsFetchWorker struct { func (w *GbfsFetchWorker) Run(ctx context.Context, job jobs.Job) error { cfg := model.ForContext(ctx) - log := job.Opts.Logger.With().Str("feed_id", w.FeedID).Str("url", w.Url).Logger() + log := job.Logger.With().Str("feed_id", w.FeedID).Str("url", w.Url).Logger() gfeeds, err := cfg.Finder.FindFeeds(ctx, nil, nil, nil, &model.FeedFilter{OnestopID: &w.FeedID}) if err != nil { log.Error().Err(err).Msg("gbfsfetch worker: error loading source feed") diff --git a/workers/gbfs_fetch_worker_test.go b/workers/gbfs_fetch_worker_test.go index a6936244..f214858d 100644 --- a/workers/gbfs_fetch_worker_test.go +++ b/workers/gbfs_fetch_worker_test.go @@ -5,10 +5,10 @@ import ( "net/http/httptest" "testing" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/gbfs" "github.com/interline-io/transitland-server/internal/testconfig" "github.com/interline-io/transitland-server/internal/testutil" - "github.com/interline-io/transitland-server/jobs" "github.com/stretchr/testify/assert" "github.com/interline-io/transitland-server/model" diff --git a/workers/rt_fetch_worker.go b/workers/rt_fetch_worker.go index 2e515beb..bdc22262 100644 --- a/workers/rt_fetch_worker.go +++ b/workers/rt_fetch_worker.go @@ -3,8 +3,8 @@ package workers import ( "context" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" ) type RTFetchWorker struct { @@ -16,7 +16,7 @@ type RTFetchWorker struct { } func (w *RTFetchWorker) Run(ctx context.Context, job jobs.Job) error { - log := job.Opts.Logger.With().Str("target", w.Target).Str("source_feed_id", w.SourceFeedID).Str("source_type", w.SourceType).Str("url", w.Url).Logger() + log := job.Logger.With().Str("target", w.Target).Str("source_feed_id", w.SourceFeedID).Str("source_type", w.SourceType).Str("url", w.Url).Logger() err := actions.RTFetch(ctx, w.Target, w.SourceFeedID, w.Url, w.SourceType) if err != nil { log.Error().Err(err).Msg("rtfetch worker: request failed") diff --git a/workers/static_fetch_worker.go b/workers/static_fetch_worker.go index b0ba4b9c..f82e839c 100644 --- a/workers/static_fetch_worker.go +++ b/workers/static_fetch_worker.go @@ -4,8 +4,8 @@ import ( "context" "errors" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" ) type StaticFetchWorker struct { @@ -15,7 +15,7 @@ type StaticFetchWorker struct { } func (w *StaticFetchWorker) Run(ctx context.Context, job jobs.Job) error { - log := job.Opts.Logger.With().Str("feed_id", w.FeedID).Str("feed_url", w.FeedUrl).Logger() + log := job.Logger.With().Str("feed_id", w.FeedID).Str("feed_url", w.FeedUrl).Logger() if result, err := actions.StaticFetch(ctx, w.FeedID, nil, w.FeedUrl); err != nil { log.Error().Err(err).Msg("staticfetch worker: request failed") return err diff --git a/workers/test_worker.go b/workers/test_worker.go index 2412af25..e2dd0cbd 100644 --- a/workers/test_worker.go +++ b/workers/test_worker.go @@ -4,19 +4,19 @@ import ( "context" "errors" - "github.com/interline-io/transitland-server/jobs" + "github.com/interline-io/transitland-mw/jobs" ) type testOkWorker struct{} func (w *testOkWorker) Run(ctx context.Context, job jobs.Job) error { - job.Opts.Logger.Info().Msg("testOkWorker") + job.Logger.Info().Msg("testOkWorker") return nil } type testFailWorker struct{} func (w *testFailWorker) Run(ctx context.Context, job jobs.Job) error { - job.Opts.Logger.Error().Msg("testFailWorker") + job.Logger.Error().Msg("testFailWorker") return errors.New("testFailWorker") } diff --git a/workers/workers.go b/workers/workers.go index 9f06bf18..02674783 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -7,8 +7,9 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/util" - "github.com/interline-io/transitland-server/jobs" + "github.com/interline-io/transitland-server/model" ) // GetWorker returns the correct worker type for this job. @@ -43,16 +44,16 @@ func GetWorker(job jobs.Job) (jobs.JobWorker, error) { } // NewServer creates a simple api for submitting and running jobs. -func NewServer(queueName string, workers int, jo jobs.JobOptions) (http.Handler, error) { +func NewServer(queueName string, workers int) (http.Handler, error) { r := chi.NewRouter() - r.HandleFunc("/add", wrapHandler(addJobRequest, jo)) - r.HandleFunc("/run", wrapHandler(runJobRequest, jo)) + r.HandleFunc("/add", wrapHandler(addJobRequest)) + r.HandleFunc("/run", wrapHandler(runJobRequest)) return r, nil } -func wrapHandler(next func(http.ResponseWriter, *http.Request, jobs.JobOptions), jo jobs.JobOptions) http.HandlerFunc { +func wrapHandler(next func(http.ResponseWriter, *http.Request)) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - next(w, r, jo) + next(w, r) }) } @@ -65,7 +66,7 @@ type jobResponse struct { } // addJobRequest adds the request to the appropriate queue -func addJobRequest(w http.ResponseWriter, req *http.Request, jo jobs.JobOptions) { +func addJobRequest(w http.ResponseWriter, req *http.Request) { job, err := requestGetJob(req) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -75,8 +76,8 @@ func addJobRequest(w http.ResponseWriter, req *http.Request, jo jobs.JobOptions) Job: job, } // add job to queue - jq := jo.JobQueue - if err := jq.AddJob(job); err != nil { + cfg := model.ForContext(req.Context()) + if err := cfg.JobQueue.AddJob(job); err != nil { ret.Error = err.Error() ret.Status = "failed" ret.Success = false @@ -88,7 +89,7 @@ func addJobRequest(w http.ResponseWriter, req *http.Request, jo jobs.JobOptions) } // runJobRequest runs the job directly -func runJobRequest(w http.ResponseWriter, req *http.Request, jo jobs.JobOptions) { +func runJobRequest(w http.ResponseWriter, req *http.Request) { job, err := requestGetJob(req) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -105,7 +106,6 @@ func runJobRequest(w http.ResponseWriter, req *http.Request, jo jobs.JobOptions) ret.Status = "failed" ret.Success = false } - job.Opts = jo if err := wk.Run(context.Background(), job); err != nil { ret.Error = err.Error() ret.Status = "failed"