diff --git a/go.mod b/go.mod index 0112e132..1f869962 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/aws/aws-sdk-go-v2 v1.17.5 github.com/aws/aws-sdk-go-v2/config v1.18.15 github.com/aws/aws-sdk-go-v2/service/location v1.22.1 - github.com/digitalocean/go-workers2 v0.10.3 github.com/flopp/go-staticmaps v0.0.0-20220221183018-c226716bec53 github.com/go-chi/chi v1.5.4 github.com/go-chi/chi/v5 v5.0.10 @@ -22,7 +21,7 @@ require ( github.com/hypirion/go-filecache v0.0.0-20160810125507-e3e6ef6981f0 github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2 github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7 - github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63 + github.com/interline-io/transitland-mw v0.0.0-20231220044449-a6aac07a9d9e github.com/jellydator/ttlcache/v2 v2.11.1 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.7 @@ -69,6 +68,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/digitalocean/go-workers2 v0.10.4 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/flopp/go-coordsparser v0.0.0-20201115094714-8baaeb7062d5 // indirect github.com/fogleman/gg v1.3.0 // indirect @@ -125,3 +125,4 @@ require ( ) // replace github.com/interline-io/transitland-lib => /Users/irees/src/interline-io/transitland-lib +// replace github.com/interline-io/transitland-mw => /Users/irees/src/interline-io/transitland-mw diff --git a/go.sum b/go.sum index 01ebaec4..999bc0a7 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= -github.com/digitalocean/go-workers2 v0.10.3 h1:qs7MELMMQybjSxmvFqvedXJAqkF7f5vC+QnVa1dmUwA= -github.com/digitalocean/go-workers2 v0.10.3/go.mod h1:4F0tUdjgqV9ZOGCTmn5Cr58CVshMI5o0SmNx6e5T98o= +github.com/digitalocean/go-workers2 v0.10.4 h1:3GTz14m2gaPPzUEEXBLLK0Y/jyWTYi5QKcSbrCbyIJQ= +github.com/digitalocean/go-workers2 v0.10.4/go.mod h1:4F0tUdjgqV9ZOGCTmn5Cr58CVshMI5o0SmNx6e5T98o= github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= @@ -272,8 +272,10 @@ github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2 h1:ScRM8Kr6UwAvbl github.com/interline-io/log v0.0.0-20231211003339-8bdc406adcd2/go.mod h1:chJaM8SKcHI6ivoeFuZ8M8axTjSV4TPmuQ+sAyAHa34= github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7 h1:rwkKzYzl05Q4TM++L9RIJbXPjIvURoMN5vEr8dvUV/Q= github.com/interline-io/transitland-lib v0.14.0-rc1.0.20231202005632-a9ea742322f7/go.mod h1:UcfuCX6DyKt/yn5GECFn3jQ6NcZEjt5XyPjf8a3tXZ4= -github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63 h1:ZaqjWLLHAUcg6giXwdnDvzqlQJxYavG5JRdfm+zvZ3c= -github.com/interline-io/transitland-mw v0.0.0-20231211012518-4cd4a8535e63/go.mod h1:4rAXsCPb7miaAmLOoKjWZSBVCUBiTU5KdisG8orDyYQ= +github.com/interline-io/transitland-mw v0.0.0-20231220044108-68bdd4c22c0c h1:orFgIOZXD989tkOBASbU/u2ksHLTYKMrOdKBk+U1wlY= +github.com/interline-io/transitland-mw v0.0.0-20231220044108-68bdd4c22c0c/go.mod h1:QyssKyKXwbTWhFT9O08nVj+gScPntMvLh1qMGNr1Ksw= +github.com/interline-io/transitland-mw v0.0.0-20231220044449-a6aac07a9d9e h1:MZnYbL21CKMWQkNBaG6xnWjU7J7uXErlBM4RwXjIfRQ= +github.com/interline-io/transitland-mw v0.0.0-20231220044449-a6aac07a9d9e/go.mod h1:QyssKyKXwbTWhFT9O08nVj+gScPntMvLh1qMGNr1Ksw= github.com/jarcoal/httpmock v1.3.1 h1:iUx3whfZWVf3jT01hQTO/Eo5sAYtB2/rqaUuOtpInww= github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= diff --git a/jobs/jobs.go b/jobs/jobs.go deleted file mode 100644 index 4f302dda..00000000 --- a/jobs/jobs.go +++ /dev/null @@ -1,59 +0,0 @@ -package jobs - -import ( - "context" - "crypto/sha1" - "encoding/hex" - "encoding/json" - - "github.com/interline-io/transitland-lib/tl" - "github.com/rs/zerolog" -) - -type JobArgs map[string]interface{} - -// Job queue -type JobQueue interface { - AddJob(Job) error - AddWorker(string, GetWorker, JobOptions, int) error - Use(JobMiddleware) - Run() error - Stop() error -} - -// Job defines a single job -type Job struct { - Queue string `json:"queue"` - JobType string `json:"job_type"` - JobArgs JobArgs `json:"job_args"` - Unique bool `json:"unique"` - JobDeadline int64 `json:"job_deadline"` - Opts JobOptions `json:"-"` - jobId string `json:"-"` -} - -func (job *Job) HexKey() (string, error) { - bytes, err := json.Marshal(job.JobArgs) - if err != nil { - return "", err - } - sum := sha1.Sum(bytes) - return job.JobType + ":" + hex.EncodeToString(sum[:]), nil -} - -// JobOptions is configuration passed to worker. -type JobOptions struct { - JobQueue JobQueue - Logger zerolog.Logger - Secrets []tl.Secret -} - -// GetWorker returns a new worker for this job type -type GetWorker func(Job) (JobWorker, error) - -// JobWorker defines a job worker -type JobWorker interface { - Run(context.Context, Job) error -} - -type JobMiddleware func(JobWorker) JobWorker diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go deleted file mode 100644 index 6861d840..00000000 --- a/jobs/jobs_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package jobs - -import ( - "context" - "sync/atomic" - "time" -) - -var ( - feeds = []string{"BA", "SF", "AC", "CT"} -) - -type testWorker struct { - count *int64 -} - -func (t *testWorker) Run(ctx context.Context, job Job) error { - time.Sleep(10 * time.Millisecond) - atomic.AddInt64(t.count, 1) - return nil -} diff --git a/jobs/local_jobs.go b/jobs/local_jobs.go deleted file mode 100644 index 3291ddc5..00000000 --- a/jobs/local_jobs.go +++ /dev/null @@ -1,147 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/interline-io/log" -) - -var jobCounter = uint64(0) - -type LocalJobs struct { - jobs chan Job - jobfuncs []func(Job) error - running bool - middlewares []JobMiddleware - uniqueJobs map[string]bool - uniqueJobsLock sync.Mutex -} - -func NewLocalJobs() *LocalJobs { - f := &LocalJobs{ - jobs: make(chan Job, 1000), - uniqueJobs: map[string]bool{}, - } - f.middlewares = append(f.middlewares, newLog()) - return f -} - -func (f *LocalJobs) Use(mwf JobMiddleware) { - f.middlewares = append(f.middlewares, mwf) -} - -func (f *LocalJobs) AddJob(job Job) error { - if f.jobs == nil { - return errors.New("closed") - } - if job.Unique { - f.uniqueJobsLock.Lock() - defer f.uniqueJobsLock.Unlock() - key, err := job.HexKey() - if err != nil { - return err - } - if _, ok := f.uniqueJobs[key]; ok { - log.Trace().Interface("job", job).Msgf("already locked: %s", key) - return nil - } else { - f.uniqueJobs[key] = true - log.Trace().Interface("job", job).Msgf("locked: %s", key) - } - } - f.jobs <- job - log.Info().Interface("job", job).Msg("jobs: added job") - return nil -} - -func (f *LocalJobs) processMessage(getWorker GetWorker, jo JobOptions, job Job) error { - job = Job{ - JobType: job.JobType, - JobArgs: job.JobArgs, - JobDeadline: job.JobDeadline, - Unique: job.Unique, - Opts: jo, - jobId: fmt.Sprintf("%d", atomic.AddUint64(&jobCounter, 1)), - } - now := time.Now().In(time.UTC).Unix() - if job.JobDeadline > 0 && job.JobDeadline < now { - log.Trace().Int64("job_deadline", job.JobDeadline).Int64("now", now).Msg("job skipped - deadline in past") - return nil - } - if job.Unique { - f.uniqueJobsLock.Lock() - defer f.uniqueJobsLock.Unlock() - key, err := job.HexKey() - if err != nil { - return err - } - delete(f.uniqueJobs, key) - log.Trace().Interface("job", job).Msgf("unlocked: %s", key) - } - w, err := getWorker(job) - if err != nil { - return err - } - if w == nil { - return errors.New("no job") - } - for _, mwf := range f.middlewares { - w = mwf(w) - if w == nil { - return errors.New("no job") - } - } - return w.Run(context.TODO(), job) -} - -func (f *LocalJobs) AddWorker(queue string, getWorker GetWorker, jo JobOptions, count int) error { - if f.running { - return errors.New("already running") - } - processMessage := func(job Job) error { - return f.processMessage(getWorker, jo, job) - } - log.Infof("jobs: created job listener") - for i := 0; i < count; i++ { - f.jobfuncs = append(f.jobfuncs, processMessage) - } - return nil -} - -func (f *LocalJobs) Run() error { - if f.running { - return errors.New("already running") - } - log.Infof("jobs: running") - f.running = true - var wg sync.WaitGroup - for _, jobfunc := range f.jobfuncs { - wg.Add(1) - go func(jf func(Job) error, w *sync.WaitGroup) { - for job := range f.jobs { - if err := jf(job); err != nil { - log.Trace().Err(err).Msg("job failed") - } - } - wg.Done() - }(jobfunc, &wg) - } - wg.Wait() - return nil -} - -func (f *LocalJobs) Stop() error { - if !f.running { - return errors.New("not running") - } - log.Infof("jobs: stopping") - close(f.jobs) - f.running = false - f.jobs = nil - return nil -} diff --git a/jobs/local_test.go b/jobs/local_test.go deleted file mode 100644 index dc89f8bb..00000000 --- a/jobs/local_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package jobs - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestLocalJobs(t *testing.T) { - t.Run("simple", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - for _, feed := range feeds { - rtJobs.AddJob(Job{JobType: "test", Unique: false, JobArgs: JobArgs{"feed_id": feed}}) - } - go func() { - time.Sleep(200 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, len(feeds), int(count)) - }) - t.Run("unique", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - for i := 0; i < 10; i++ { - // 1 job: j=0 - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/10)}} - rtJobs.AddJob(job) - } - // 3 jobs; j=3, j=6, j=9... j=0 is not unique - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/3)}} - rtJobs.AddJob(job) - } - // 10 jobs: j=0, j=0, j=2, j=2, j=4, j=4, j=6 j=6, j=8, j=8 - for j := 0; j < 10; j++ { - job := Job{JobType: "testNotUnique", Unique: false, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/2)}} - rtJobs.AddJob(job) - } - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 4) - go func() { - time.Sleep(1000 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(104), count) - }) - t.Run("deadline", func(t *testing.T) { - rtJobs := NewLocalJobs() - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: 0}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(1 * time.Hour).Unix()}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(-1 * time.Hour).Unix()}) - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - go func() { - time.Sleep(100 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(2), count) - }) - -} diff --git a/jobs/log.go b/jobs/log.go deleted file mode 100644 index c5391652..00000000 --- a/jobs/log.go +++ /dev/null @@ -1,31 +0,0 @@ -package jobs - -import ( - "context" - "time" - - "github.com/interline-io/log" -) - -type jlog struct { - JobWorker -} - -func (w *jlog) Run(ctx context.Context, job Job) error { - t1 := time.Now() - job.Opts.Logger = log.Logger.With().Str("job_type", job.JobType).Str("job_id", job.jobId).Logger() - job.Opts.Logger.Info().Msg("job: started") - if err := w.JobWorker.Run(ctx, job); err != nil { - job.Opts.Logger.Error().Err(err).Msg("job: error") - return err - } - job.Opts.Logger.Info().Int64("job_time_ms", (time.Now().UnixNano()-t1.UnixNano())/1e6).Msg("job: completed") - return nil - -} - -func newLog() JobMiddleware { - return func(w JobWorker) JobWorker { - return &jlog{JobWorker: w} - } -} diff --git a/jobs/redis_jobs.go b/jobs/redis_jobs.go deleted file mode 100644 index 59743041..00000000 --- a/jobs/redis_jobs.go +++ /dev/null @@ -1,185 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "fmt" - "os" - "strconv" - "time" - - workers "github.com/digitalocean/go-workers2" - "github.com/go-redis/redis/v8" - - "github.com/interline-io/log" -) - -// RedisJobs is a simple wrapper around go-workers -type RedisJobs struct { - queuePrefix string - producer *workers.Producer - manager *workers.Manager - client *redis.Client - middlewares []JobMiddleware -} - -func NewRedisJobs(client *redis.Client, queuePrefix string) *RedisJobs { - f := RedisJobs{ - queuePrefix: queuePrefix, - client: client, - } - f.Use(newLog()) - return &f -} - -type redisJob struct { - JobType string `json:"job_type"` - JobArgs JobArgs `json:"job_args"` - JobDeadline int64 `json:"job_deadline"` - Unique bool `json:"unique"` -} - -func (f *RedisJobs) Use(mwf JobMiddleware) { - f.middlewares = append(f.middlewares, mwf) -} - -func (f *RedisJobs) AddJob(job Job) error { - if f.producer == nil { - var err error - f.producer, err = workers.NewProducerWithRedisClient(workers.Options{ - ProcessID: strconv.Itoa(os.Getpid()), - }, f.client) - if err != nil { - return err - } - } - if job.Unique { - key, err := job.HexKey() - if err != nil { - return err - } - fullKey := fmt.Sprintf("queue:%s:unique:%s", f.queueName(job.Queue), key) - deadlineTtl := time.Duration(60*60) * time.Second - if sec := job.JobDeadline - time.Now().In(time.UTC).Unix(); sec > 0 { - deadlineTtl = time.Duration(sec) * time.Second - } - logMsg := log.Trace().Interface("job", job).Str("key", fullKey).Float64("ttl", deadlineTtl.Seconds()) - if !f.client.SetNX(context.Background(), fullKey, "unique", deadlineTtl).Val() { - logMsg.Msg("unique job already locked") - return nil - } else { - logMsg.Msg("unique job locked") - } - } - rjob := redisJob{ - JobType: job.JobType, - JobArgs: job.JobArgs, - Unique: job.Unique, - JobDeadline: job.JobDeadline, - } - _, err := f.producer.Enqueue(f.queueName(job.Queue), rjob.JobType, rjob) - return err -} - -func (f *RedisJobs) queueName(q string) string { - if q == "" { - q = "default" - } - return f.queuePrefix + q -} - -func (f *RedisJobs) getManager() (*workers.Manager, error) { - var err error - if f.manager == nil { - f.manager, err = workers.NewManagerWithRedisClient(workers.Options{ - ProcessID: strconv.Itoa(os.Getpid()), - }, f.client) - } - return f.manager, err -} - -func (f *RedisJobs) processMessage(queueName string, getWorker GetWorker, jo JobOptions, msg *workers.Msg) error { - j := msg.Args() - job := Job{ - JobType: msg.Class(), - jobId: msg.Jid(), - Queue: queueName, - Opts: jo, - } - job.JobArgs, _ = j.Get("job_args").Map() - job.JobDeadline, _ = j.Get("job_deadline").Int64() - job.Unique, _ = j.Get("unique").Bool() - now := time.Now().In(time.UTC).Unix() - if job.Unique { - // Consider more advanced locking options - key, err := job.HexKey() - if err != nil { - return err - } - fullKey := fmt.Sprintf("queue:%s:unique:%s", f.queueName(job.Queue), key) - ctx := context.Background() - logMsg := log.Trace().Str("key", fullKey) - defer func() { - if result, err := f.client.Del(ctx, fullKey).Result(); err != nil { - logMsg.Err(err).Msg("error unlocking job!") - } else { - logMsg.Int64("result", result).Msg("unique job unlocked") - } - }() - } - if job.JobDeadline > 0 && now > job.JobDeadline { - log.Trace().Int64("job_deadline", job.JobDeadline).Int64("now", now).Msg("job skipped - deadline in past") - return nil - } - w, err := getWorker(job) - if err != nil { - return err - } - if w == nil { - return errors.New("no job") - } - for _, mwf := range f.middlewares { - w = mwf(w) - if w == nil { - return errors.New("no job") - } - } - if err := w.Run(context.TODO(), job); err != nil { - log.Trace().Err(err).Msg("job failed") - } - return nil -} - -func (f *RedisJobs) AddWorker(queue string, getWorker GetWorker, jo JobOptions, count int) error { - manager, err := f.getManager() - if err != nil { - return err - } - processMessage := func(msg *workers.Msg) error { - return f.processMessage(queue, getWorker, jo, msg) - } - if queue == "" { - queue = "default" - } - manager.AddWorker(f.queueName(queue), count, processMessage) - return nil -} - -func (f *RedisJobs) Run() error { - log.Infof("jobs: running") - manager, err := f.getManager() - if err == nil { - // Blocks - manager.Run() - } - return err -} - -func (f *RedisJobs) Stop() error { - log.Infof("jobs: stopping") - manager, err := f.getManager() - if err == nil { - manager.Stop() - } - return err -} diff --git a/jobs/redis_test.go b/jobs/redis_test.go deleted file mode 100644 index f9217d27..00000000 --- a/jobs/redis_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package jobs - -import ( - "fmt" - "os" - "testing" - "time" - - "github.com/interline-io/transitland-server/internal/testutil" - "github.com/stretchr/testify/assert" -) - -func TestRedisJobs(t *testing.T) { - // redis jobs and cache - if a, ok := testutil.CheckTestRedisClient(); !ok { - t.Skip(a) - return - } - client := testutil.MustOpenTestRedisClient() - - t.Run("simple", func(t *testing.T) { - // Ugly :( - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue1:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - for _, feed := range feeds { - rtJobs.AddJob(Job{JobType: "test", Unique: false, JobArgs: JobArgs{"feed_id": feed}}) - } - go func() { - time.Sleep(200 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, len(feeds), int(count)) - }) - t.Run("unique", func(t *testing.T) { - // Abuse the job queue - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue2:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - for i := 0; i < 10; i++ { - // 1 job: j=0 - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/10)}} - rtJobs.AddJob(job) - } - // 3 jobs; j=3, j=6, j=9... j=0 is not unique - for j := 0; j < 10; j++ { - job := Job{JobType: "testUnique", Unique: true, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/3)}} - rtJobs.AddJob(job) - } - // 10 jobs: j=0, j=0, j=2, j=2, j=4, j=4, j=6 j=6, j=8, j=8 - for j := 0; j < 10; j++ { - job := Job{JobType: "testNotUnique", Unique: false, JobArgs: JobArgs{"test": fmt.Sprintf("n:%d", j/2)}} - rtJobs.AddJob(job) - } - } - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 4) - go func() { - time.Sleep(1000 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(104), count) - }) - t.Run("deadline", func(t *testing.T) { - rtJobs := NewRedisJobs(client, fmt.Sprintf("queue3:%d:%d", os.Getpid(), time.Now().UnixNano())) - count := int64(0) - testGetWorker := func(job Job) (JobWorker, error) { - w := testWorker{count: &count} - return &w, nil - } - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: 0}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(1 * time.Hour).Unix()}) - rtJobs.AddJob(Job{JobType: "testUnique", Unique: false, JobArgs: JobArgs{"test": "test"}, JobDeadline: time.Now().Add(-1 * time.Hour).Unix()}) - rtJobs.AddWorker("", testGetWorker, JobOptions{}, 1) - go func() { - time.Sleep(100 * time.Millisecond) - rtJobs.Stop() - }() - rtJobs.Run() - assert.Equal(t, int64(2), count) - }) -} diff --git a/server/server_cmd.go b/server/server_cmd.go index 76e643b0..061001d7 100644 --- a/server/server_cmd.go +++ b/server/server_cmd.go @@ -25,6 +25,7 @@ import ( "github.com/interline-io/transitland-lib/tl" "github.com/interline-io/transitland-mw/auth/ancheck" "github.com/interline-io/transitland-mw/auth/azcheck" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-mw/lmw" "github.com/interline-io/transitland-mw/meters" "github.com/interline-io/transitland-mw/metrics" @@ -32,7 +33,6 @@ import ( "github.com/interline-io/transitland-server/finders/gbfsfinder" "github.com/interline-io/transitland-server/finders/rtfinder" "github.com/interline-io/transitland-server/internal/dbutil" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" "github.com/interline-io/transitland-server/server/gql" "github.com/interline-io/transitland-server/server/playground" diff --git a/workers/fetch_enqueue_worker.go b/workers/fetch_enqueue_worker.go index 100a109f..9fac6496 100644 --- a/workers/fetch_enqueue_worker.go +++ b/workers/fetch_enqueue_worker.go @@ -5,8 +5,8 @@ import ( "time" "github.com/interline-io/transitland-lib/tl" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" ) diff --git a/workers/gbfs_fetch_worker.go b/workers/gbfs_fetch_worker.go index e1a473b0..7f304c64 100644 --- a/workers/gbfs_fetch_worker.go +++ b/workers/gbfs_fetch_worker.go @@ -7,8 +7,8 @@ import ( "time" "github.com/interline-io/transitland-lib/tldb" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/gbfs" - "github.com/interline-io/transitland-server/jobs" "github.com/interline-io/transitland-server/model" ) diff --git a/workers/gbfs_fetch_worker_test.go b/workers/gbfs_fetch_worker_test.go index a6936244..f214858d 100644 --- a/workers/gbfs_fetch_worker_test.go +++ b/workers/gbfs_fetch_worker_test.go @@ -5,10 +5,10 @@ import ( "net/http/httptest" "testing" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/gbfs" "github.com/interline-io/transitland-server/internal/testconfig" "github.com/interline-io/transitland-server/internal/testutil" - "github.com/interline-io/transitland-server/jobs" "github.com/stretchr/testify/assert" "github.com/interline-io/transitland-server/model" diff --git a/workers/rt_fetch_worker.go b/workers/rt_fetch_worker.go index 2e515beb..cd113859 100644 --- a/workers/rt_fetch_worker.go +++ b/workers/rt_fetch_worker.go @@ -3,8 +3,8 @@ package workers import ( "context" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" ) type RTFetchWorker struct { diff --git a/workers/static_fetch_worker.go b/workers/static_fetch_worker.go index b0ba4b9c..797f8dc9 100644 --- a/workers/static_fetch_worker.go +++ b/workers/static_fetch_worker.go @@ -4,8 +4,8 @@ import ( "context" "errors" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/actions" - "github.com/interline-io/transitland-server/jobs" ) type StaticFetchWorker struct { diff --git a/workers/test_worker.go b/workers/test_worker.go index 2412af25..aa5dd447 100644 --- a/workers/test_worker.go +++ b/workers/test_worker.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "github.com/interline-io/transitland-server/jobs" + "github.com/interline-io/transitland-mw/jobs" ) type testOkWorker struct{} diff --git a/workers/workers.go b/workers/workers.go index 9f06bf18..a40f70b6 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -7,8 +7,8 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/interline-io/transitland-mw/jobs" "github.com/interline-io/transitland-server/internal/util" - "github.com/interline-io/transitland-server/jobs" ) // GetWorker returns the correct worker type for this job.