Skip to content

Commit

Permalink
feat(scheduler): pass the shutdown/reload ctx to running tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Aug 7, 2023
1 parent fce9a02 commit 53e86d1
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newScrubCmd(conf *config.Config) *cobra.Command {
panic(err)
}

result, err := ctlr.StoreController.CheckAllBlobsIntegrity()
result, err := ctlr.StoreController.CheckAllBlobsIntegrity(cmd.Context())
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/extensions/extension_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package extensions

import (
"context"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -128,7 +129,7 @@ func newTrivyTask(interval time.Duration, cveInfo cveinfo.CveInfo,
return &trivyTask{interval, cveInfo, generator, log}
}

func (trivyT *trivyTask) DoWork() error {
func (trivyT *trivyTask) DoWork(ctx context.Context) error {
trivyT.log.Info().Msg("updating the CVE database")

err := trivyT.cveInfo.UpdateDB()
Expand Down
9 changes: 5 additions & 4 deletions pkg/extensions/scrub/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package scrub

import (
"context"
"fmt"
"path"

Expand All @@ -13,11 +14,11 @@ import (
)

// Scrub Extension for repo...
func RunScrubRepo(imgStore storageTypes.ImageStore, repo string, log log.Logger) error {
func RunScrubRepo(ctx context.Context, imgStore storageTypes.ImageStore, repo string, log log.Logger) error {
execMsg := fmt.Sprintf("executing scrub to check manifest/blob integrity for %s", path.Join(imgStore.RootDir(), repo))
log.Info().Msg(execMsg)

results, err := storage.CheckRepo(repo, imgStore)
results, err := storage.CheckRepo(ctx, repo, imgStore)
if err != nil {
errMessage := fmt.Sprintf("error while running scrub for %s", path.Join(imgStore.RootDir(), repo))
log.Error().Err(err).Msg(errMessage)
Expand Down Expand Up @@ -58,6 +59,6 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas
return &Task{imgStore, repo, log}
}

func (scrubT *Task) DoWork() error {
return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log)
func (scrubT *Task) DoWork(ctx context.Context) error {
return RunScrubRepo(ctx, scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck
}
22 changes: 14 additions & 8 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (service *BaseService) SyncImage(repo, reference string) error {
service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference).
Msg("sync: syncing image")

manifestDigest, err := service.syncTag(repo, remoteRepo, reference)
manifestDigest, err := service.syncTag(context.Background(), repo, remoteRepo, reference)
if err != nil {
return err
}
Expand All @@ -260,15 +260,15 @@ func (service *BaseService) SyncImage(repo, reference string) error {
}

// sync repo periodically.
func (service *BaseService) SyncRepo(repo string) error {
func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Str("registry", service.client.GetConfig().URL).
Msg("sync: syncing repo")

var err error

var tags []string

if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
tags, err = service.remote.GetRepoTags(repo)

return err
Expand All @@ -291,14 +291,20 @@ func (service *BaseService) SyncRepo(repo string) error {
localRepo := service.contentManager.GetRepoDestination(repo)

for _, tag := range tags {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if references.IsCosignTag(tag) {
continue
}

var manifestDigest digest.Digest

if err = retry.RetryIfNecessary(context.Background(), func() error {
manifestDigest, err = service.syncTag(localRepo, repo, tag)
if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)

return err
}, service.retryOptions); err != nil {
Expand All @@ -314,7 +320,7 @@ func (service *BaseService) SyncRepo(repo string) error {
}

if manifestDigest != "" {
if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(localRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
Expand All @@ -335,7 +341,7 @@ func (service *BaseService) SyncRepo(repo string) error {
return nil
}

func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.Digest, error) {
func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())

policyContext, err := getPolicyContext(service.log)
Expand Down Expand Up @@ -397,7 +403,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D
service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")

_, err = copy.Image(context.Background(), policyContext, localImageRef, remoteImageRef, &copyOptions)
_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
Expand Down
6 changes: 3 additions & 3 deletions pkg/extensions/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Service interface {
// Get next repo from remote /v2/_catalog, will return empty string when there is no repo left.
GetNextRepo(lastRepo string) (string, error) // used by task scheduler
// Sync a repo with all of its tags and references (signatures, artifacts, sboms) into ImageStore.
SyncRepo(repo string) error // used by periodically sync
SyncRepo(ctx context.Context, repo string) error // used by periodically sync
// Sync an image (repo:tag || repo:digest) into ImageStore.
SyncImage(repo, reference string) error // used by sync on demand
// Sync a single reference for an image.
Expand Down Expand Up @@ -129,6 +129,6 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask {
return &syncRepoTask{repo, service}
}

func (srt *syncRepoTask) DoWork() error {
return srt.service.SyncRepo(srt.repo)
func (srt *syncRepoTask) DoWork(ctx context.Context) error {
return srt.service.SyncRepo(ctx, srt.repo)
}
2 changes: 1 addition & 1 deletion pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestService(t *testing.T) {
service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

err = service.SyncRepo("repo")
err = service.SyncRepo(context.Background(), "repo")
So(err, ShouldNotBeNil)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/meta/signatures/signatures.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ func NewValidityTask(metaDB mTypes.MetaDB, repo mTypes.RepoMetadata, log log.Log
return &validityTask{metaDB, repo, log}
}

func (validityT *validityTask) DoWork() error {
func (validityT *validityTask) DoWork(ctx context.Context) error {
validityT.log.Info().Msg("updating signatures validity")

for signedManifest, sigs := range validityT.repo.Signatures {
if len(sigs[CosignSignature]) != 0 || len(sigs[NotationSignature]) != 0 {
//nolint: contextcheck
err := validityT.metaDB.UpdateSignaturesValidity(validityT.repo.Name, godigest.Digest(signedManifest))
if err != nil {
validityT.log.Info().Msg("error while verifying signatures")
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ In order to create a new generator (which will generate new tasks one by one) an
```
This method should implement the logic for generating a new task.
Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated.
Also, the Task returned by this method should implement DoWork() method which should contain the logic that should be executed when this task is run by the scheduler.
Also, the Task returned by this method should implement DoWork(ctx context.Context) method which should contain the logic that should be executed when this task is run by the scheduler.
```
2. ***IsDone() bool***
```
Expand All @@ -31,10 +31,10 @@ Notes:

# How to submit a Task to the scheduler

In order to create a new task and add it to the scheduler ***DoWork() error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.
In order to create a new task and add it to the scheduler ***DoWork(ctx context.Context) error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.

To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters.

Note:

- A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task.
- A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task.
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type Task interface {
DoWork() error
DoWork(ctx context.Context) error
}

type generatorsPriorityQueue []*generator
Expand Down Expand Up @@ -97,13 +97,13 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
}
}

func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) {
func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for task := range tasks {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")

if err := task.DoWork(); err != nil {
if err := task.DoWork(ctx); err != nil {
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
}

Expand All @@ -120,7 +120,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
tasksWorker := make(chan Task, numWorkers)

// start worker pool
go scheduler.poolWorker(numWorkers, tasksWorker)
go scheduler.poolWorker(ctx, numWorkers, tasksWorker)

go func() {
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type task struct {

var errInternal = errors.New("task: internal error")

func (t *task) DoWork() error {
func (t *task) DoWork(ctx context.Context) error {
if t.err {
return errInternal
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"bytes"
"context"
"encoding/json"
"errors"
"path"
Expand Down Expand Up @@ -876,9 +877,9 @@ func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, ded
return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log}
}

func (dt *dedupeTask) DoWork() error {
func (dt *dedupeTask) DoWork(ctx context.Context) error {
// run task
err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs)
err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck
if err != nil {
// log it
dt.log.Error().Err(err).Str("digest", dt.digest.String()).Msg("rebuild dedupe: failed to rebuild digest")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1860,8 +1860,8 @@ func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
return &gcTask{imgStore, repo}
}

func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
func (gcT *gcTask) DoWork(ctx context.Context) error {
return gcT.imgStore.RunGCRepo(gcT.repo) //nolint: contextcheck
}

func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest,
Expand Down
23 changes: 13 additions & 10 deletions pkg/storage/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ScrubResults struct {
ScrubResults []ScrubImageResult `json:"scrubResults"`
}

func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) {
func (sc StoreController) CheckAllBlobsIntegrity(ctx context.Context) (ScrubResults, error) {
results := ScrubResults{}

imageStoreList := make(map[string]storageTypes.ImageStore)
Expand All @@ -54,7 +54,7 @@ func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) {
imageStoreList[""] = sc.DefaultStore

for _, imgStore := range imageStoreList {
imgStoreResults, err := CheckImageStoreBlobsIntegrity(imgStore)
imgStoreResults, err := CheckImageStoreBlobsIntegrity(ctx, imgStore)
if err != nil {
return results, err
}
Expand All @@ -65,7 +65,7 @@ func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) {
return results, nil
}

func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) {
func CheckImageStoreBlobsIntegrity(ctx context.Context, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) {
results := []ScrubImageResult{}

repos, err := imgStore.GetRepositories()
Expand All @@ -74,7 +74,7 @@ func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubIma
}

for _, repo := range repos {
imageResults, err := CheckRepo(repo, imgStore)
imageResults, err := CheckRepo(ctx, repo, imgStore)
if err != nil {
return results, err
}
Expand All @@ -85,20 +85,23 @@ func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubIma
return results, nil
}

func CheckRepo(imageName string, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) {
func CheckRepo(ctx context.Context, imageName string, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) {
results := []ScrubImageResult{}

if ctx.Err() != nil {
return results, ctx.Err()
}

dir := path.Join(imgStore.RootDir(), imageName)
if !imgStore.DirExists(dir) {
return results, errors.ErrRepoNotFound
}

ctxUmoci := context.Background()

oci, err := umoci.OpenLayout(dir)
if err != nil {
return results, err
}

defer oci.Close()

var lockLatency time.Time
Expand Down Expand Up @@ -146,7 +149,7 @@ func CheckRepo(imageName string, imgStore storageTypes.ImageStore) ([]ScrubImage

for _, m := range listOfManifests {
tag := m.Annotations[ispec.AnnotationRefName]
imageResult := CheckIntegrity(ctxUmoci, imageName, tag, oci, m, dir)
imageResult := CheckIntegrity(ctx, imageName, tag, oci, m, dir)
results = append(results, imageResult)
}

Expand All @@ -160,10 +163,10 @@ func CheckIntegrity(ctx context.Context, imageName, tagName string, oci casext.E
}

// check layers
return CheckLayers(imageName, tagName, dir, manifest)
return CheckLayers(ctx, imageName, tagName, dir, manifest)
}

func CheckLayers(imageName, tagName, dir string, manifest ispec.Descriptor) ScrubImageResult {
func CheckLayers(ctx context.Context, imageName, tagName, dir string, manifest ispec.Descriptor) ScrubImageResult {
imageRes := ScrubImageResult{}

buf, err := os.ReadFile(path.Join(dir, "blobs", manifest.Digest.Algorithm().String(), manifest.Digest.Encoded()))
Expand Down
Loading

0 comments on commit 53e86d1

Please sign in to comment.