Skip to content

Commit

Permalink
feat(scheduler): pass the shutdown/reload ctx to running tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Aug 4, 2023
1 parent 2142055 commit 75a23cd
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 28 deletions.
3 changes: 2 additions & 1 deletion pkg/extensions/extension_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package extensions

import (
"context"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -128,7 +129,7 @@ func newTrivyTask(interval time.Duration, cveInfo cveinfo.CveInfo,
return &trivyTask{interval, cveInfo, generator, log}
}

func (trivyT *trivyTask) DoWork() error {
func (trivyT *trivyTask) DoWork(ctx context.Context) error {
trivyT.log.Info().Msg("updating the CVE database")

err := trivyT.cveInfo.UpdateDB()
Expand Down
5 changes: 3 additions & 2 deletions pkg/extensions/scrub/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package scrub

import (
"context"
"fmt"
"path"

Expand Down Expand Up @@ -58,6 +59,6 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas
return &Task{imgStore, repo, log}
}

func (scrubT *Task) DoWork() error {
return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log)
func (scrubT *Task) DoWork(ctx context.Context) error {
return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck
}
22 changes: 14 additions & 8 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (service *BaseService) SyncImage(repo, reference string) error {
service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference).
Msg("sync: syncing image")

manifestDigest, err := service.syncTag(repo, remoteRepo, reference)
manifestDigest, err := service.syncTag(context.Background(), repo, remoteRepo, reference)
if err != nil {
return err
}
Expand All @@ -260,15 +260,15 @@ func (service *BaseService) SyncImage(repo, reference string) error {
}

// sync repo periodically.
func (service *BaseService) SyncRepo(repo string) error {
func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Str("registry", service.client.GetConfig().URL).
Msg("sync: syncing repo")

var err error

var tags []string

if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
tags, err = service.remote.GetRepoTags(repo)

return err
Expand All @@ -291,14 +291,20 @@ func (service *BaseService) SyncRepo(repo string) error {
localRepo := service.contentManager.GetRepoDestination(repo)

for _, tag := range tags {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if references.IsCosignTag(tag) {
continue
}

var manifestDigest digest.Digest

if err = retry.RetryIfNecessary(context.Background(), func() error {
manifestDigest, err = service.syncTag(localRepo, repo, tag)
if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)

return err
}, service.retryOptions); err != nil {
Expand All @@ -314,7 +320,7 @@ func (service *BaseService) SyncRepo(repo string) error {
}

if manifestDigest != "" {
if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(localRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
Expand All @@ -335,7 +341,7 @@ func (service *BaseService) SyncRepo(repo string) error {
return nil
}

func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.Digest, error) {
func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())

policyContext, err := getPolicyContext(service.log)
Expand Down Expand Up @@ -397,7 +403,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D
service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")

_, err = copy.Image(context.Background(), policyContext, localImageRef, remoteImageRef, &copyOptions)
_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
Expand Down
6 changes: 3 additions & 3 deletions pkg/extensions/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Service interface {
// Get next repo from remote /v2/_catalog, will return empty string when there is no repo left.
GetNextRepo(lastRepo string) (string, error) // used by task scheduler
// Sync a repo with all of its tags and references (signatures, artifacts, sboms) into ImageStore.
SyncRepo(repo string) error // used by periodically sync
SyncRepo(ctx context.Context, repo string) error // used by periodically sync
// Sync an image (repo:tag || repo:digest) into ImageStore.
SyncImage(repo, reference string) error // used by sync on demand
// Sync a single reference for an image.
Expand Down Expand Up @@ -129,6 +129,6 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask {
return &syncRepoTask{repo, service}
}

func (srt *syncRepoTask) DoWork() error {
return srt.service.SyncRepo(srt.repo)
func (srt *syncRepoTask) DoWork(ctx context.Context) error {
return srt.service.SyncRepo(ctx, srt.repo)
}
2 changes: 1 addition & 1 deletion pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestService(t *testing.T) {
service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

err = service.SyncRepo("repo")
err = service.SyncRepo(context.Background(), "repo")
So(err, ShouldNotBeNil)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/meta/signatures/signatures.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ func NewValidityTask(metaDB mTypes.MetaDB, repo mTypes.RepoMetadata, log log.Log
return &validityTask{metaDB, repo, log}
}

func (validityT *validityTask) DoWork() error {
func (validityT *validityTask) DoWork(ctx context.Context) error {
validityT.log.Info().Msg("updating signatures validity")

for signedManifest, sigs := range validityT.repo.Signatures {
if len(sigs[CosignSignature]) != 0 || len(sigs[NotationSignature]) != 0 {
//nolint: contextcheck
err := validityT.metaDB.UpdateSignaturesValidity(validityT.repo.Name, godigest.Digest(signedManifest))
if err != nil {
validityT.log.Info().Msg("error while verifying signatures")
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ In order to create a new generator (which will generate new tasks one by one) an
```
This method should implement the logic for generating a new task.
Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated.
Also, the Task returned by this method should implement DoWork() method which should contain the logic that should be executed when this task is run by the scheduler.
Also, the Task returned by this method should implement DoWork(ctx context.Context) method which should contain the logic that should be executed when this task is run by the scheduler.
```
2. ***IsDone() bool***
```
Expand All @@ -31,10 +31,10 @@ Notes:

# How to submit a Task to the scheduler

In order to create a new task and add it to the scheduler ***DoWork() error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.
In order to create a new task and add it to the scheduler ***DoWork(ctx context.Context) error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.

To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters.

Note:

- A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task.
- A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task.
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type Task interface {
DoWork() error
DoWork(ctx context.Context) error
}

type generatorsPriorityQueue []*generator
Expand Down Expand Up @@ -97,13 +97,13 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
}
}

func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) {
func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for task := range tasks {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")

if err := task.DoWork(); err != nil {
if err := task.DoWork(ctx); err != nil {
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
}

Expand All @@ -120,7 +120,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
tasksWorker := make(chan Task, numWorkers)

// start worker pool
go scheduler.poolWorker(numWorkers, tasksWorker)
go scheduler.poolWorker(ctx, numWorkers, tasksWorker)

go func() {
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type task struct {

var errInternal = errors.New("task: internal error")

func (t *task) DoWork() error {
func (t *task) DoWork(ctx context.Context) error {
if t.err {
return errInternal
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"bytes"
"context"
"encoding/json"
"errors"
"path"
Expand Down Expand Up @@ -876,9 +877,9 @@ func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, ded
return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log}
}

func (dt *dedupeTask) DoWork() error {
func (dt *dedupeTask) DoWork(ctx context.Context) error {
// run task
err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs)
err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck
if err != nil {
// log it
dt.log.Error().Err(err).Str("digest", dt.digest.String()).Msg("rebuild dedupe: failed to rebuild digest")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1860,8 +1860,8 @@ func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
return &gcTask{imgStore, repo}
}

func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
func (gcT *gcTask) DoWork(ctx context.Context) error {
return gcT.imgStore.RunGCRepo(gcT.repo) //nolint: contextcheck
}

func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest,
Expand Down

0 comments on commit 75a23cd

Please sign in to comment.