Skip to content

Commit

Permalink
Merge pull request #160 from ivanilves/batch-delay
Browse files Browse the repository at this point in the history
Add delay between batches & one more "concurrent map write" fix
  • Loading branch information
vonrabbe authored May 11, 2018
2 parents 9f60bca + 85a2485 commit 109227f
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package container

import (
"bufio"
Expand Down Expand Up @@ -78,8 +78,8 @@ func verify(hostname string) error {
return err
}

// LaunchContainer launches a Docker container with Docker registry inside
func LaunchContainer() (*Container, error) {
// Launch launches a Docker container with Docker registry inside
func Launch() (*Container, error) {
dockerClient, _ := getDockerClient()

hostPort := getRandomPort()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package container

import (
"testing"
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestRunGuaranteedFailure(t *testing.T) {
}

func testVerify(t *testing.T) {
c, _ := LaunchContainer()
c, _ := Launch()

defer c.Destroy()

Expand All @@ -81,8 +81,8 @@ func testVerifyGuaranteedFailure(t *testing.T) {
}
}

func TestLaunchContainerAndThanDestroyIt(t *testing.T) {
c, err := LaunchContainer()
func TestLaunchAndThanDestroyIt(t *testing.T) {
c, err := Launch()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestLaunchManyContainersWithoutNamingCollisions(t *testing.T) {

for c := 0; c < createContainers; c++ {
go func() {
c, err := LaunchContainer()
c, err := Launch()
if err != nil {
done <- err
return
Expand All @@ -131,7 +131,7 @@ func TestLaunchManyContainersWithoutNamingCollisions(t *testing.T) {
}

func TestSeedContainerWithImages(t *testing.T) {
c, err := LaunchContainer()
c, err := Launch()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestSeedContainerWithImages(t *testing.T) {
}

func TestSeedContainerWithImagesGuaranteedFailure(t *testing.T) {
c, err := LaunchContainer()
c, err := Launch()
if err != nil {
t.Fatal(err)
}
Expand Down
70 changes: 45 additions & 25 deletions api/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Config struct {
DockerJSONConfigFile string
// ConcurrentRequests defines how much requests to registry we could run in parallel
ConcurrentRequests int
// WaitBetween defines how much we will wait between batches of requests (incl. pull and push)
WaitBetween time.Duration
// TraceRequests sets if we will print out registry HTTP request traces
TraceRequests bool
// RetryRequests defines how much retries we will do to the failed HTTP request
Expand Down Expand Up @@ -56,6 +58,12 @@ type API struct {
dockerClient *dockerclient.DockerClient
}

// rtags is a structure to send collection of referenced tags using chan
type rtags struct {
ref string
tags []*tag.Tag
}

// fn gives the name of the calling function (e.g. enriches log.Debugf() output)
// + optionally attaches free form string labels (mainly to identify goroutines)
func fn(labels ...string) string {
Expand Down Expand Up @@ -101,6 +109,26 @@ func getBatchedSlices(batchSize int, unbatched ...string) [][]string {
return batchedSlices
}

func receiveTags(tagc chan rtags) map[string][]*tag.Tag {
tags := make(map[string][]*tag.Tag)

step := 1
size := cap(tagc)
for t := range tagc {
log.Debugf("[%s] receiving tags: %+v", t.ref, t.tags)

tags[t.ref] = t.tags

if step >= size {
close(tagc)
}

step++
}

return tags
}

// CollectTags collects information on tags present in remote registry and [local] Docker daemon,
// makes required comparisons between them and spits organized info back as collection.Collection
func (api *API) CollectTags(refs ...string) (*collection.Collection, error) {
Expand All @@ -113,15 +141,8 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) {
return nil, err
}

type rtags struct {
ref string
tags []*tag.Tag
}

tagc := make(chan rtags, len(refs))

tags := make(map[string][]*tag.Tag)

batchedSlicesOfRefs := getBatchedSlices(api.config.ConcurrentRequests, refs...)

for bindex, brefs := range batchedSlicesOfRefs {
Expand Down Expand Up @@ -172,22 +193,12 @@ func (api *API) CollectTags(refs ...string) (*collection.Collection, error) {
if err := wait.Until(done); err != nil {
return nil, err
}
}

step := 1
size := cap(tagc)
for t := range tagc {
log.Debugf("[%s] receiving tags: %+v", t.ref, t.tags)

tags[t.ref] = t.tags

if step >= size {
close(tagc)
}

step++
time.Sleep(api.config.WaitBetween)
}

tags := receiveTags(tagc)

log.Debugf("%s tags: %+v", fn(), tags)

return collection.New(refs, tags)
Expand Down Expand Up @@ -220,7 +231,7 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co

refs := make([]string, len(cn.Refs()))
done := make(chan error, len(cn.Refs()))
tags := make(map[string][]*tag.Tag)
tagc := make(chan rtags, len(refs))

for i, repo := range cn.Repos() {
go func(repo *repository.Repository, i int, done chan error) {
Expand Down Expand Up @@ -272,19 +283,23 @@ func (api *API) CollectPushTags(cn *collection.Collection, push PushConfig) (*co
tagsToPush = append(tagsToPush, tg)
}
}
log.Debugf("%s tags to push: %+v", fn(repo.Ref()), tagsToPush)

tags[repo.Ref()] = tagsToPush
log.Debugf("%s sending 'push' tags: %+v", fn(repo.Ref()), tagsToPush)

tagc <- rtags{ref: repo.Ref(), tags: tagsToPush}
done <- nil

return
}(repo, i, done)

time.Sleep(api.config.WaitBetween)
}

if err := wait.Until(done); err != nil {
return nil, err
}

tags := receiveTags(tagc)

log.Debugf("%s 'push' tags: %+v", fn(), tags)

return collection.New(refs, tags)
Expand Down Expand Up @@ -331,6 +346,8 @@ func (api *API) PullTags(cn *collection.Collection) error {
done <- nil
}
}(repo, tags, done)

time.Sleep(api.config.WaitBetween)
}

return wait.WithTolerance(done)
Expand Down Expand Up @@ -388,6 +405,8 @@ func (api *API) PushTags(cn *collection.Collection, push PushConfig) error {
done <- err
}
}(repo, tags, done)

time.Sleep(api.config.WaitBetween)
}

return wait.WithTolerance(done)
Expand All @@ -408,9 +427,10 @@ func New(config Config) (*API, error) {
log.Debugf("%s API config: %+v", fn(), config)

if config.ConcurrentRequests == 0 {
config.ConcurrentRequests = 1
config.ConcurrentRequests = 8
}
remote.ConcurrentRequests = config.ConcurrentRequests
remote.WaitBetween = config.WaitBetween
remote.TraceRequests = config.TraceRequests
remote.RetryRequests = config.RetryRequests
remote.RetryDelay = config.RetryDelay
Expand Down
4 changes: 2 additions & 2 deletions api/v1/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/ivanilves/lstags/api/internal/registry"
registrycontainer "github.com/ivanilves/lstags/api/v1/registry/container"
"github.com/ivanilves/lstags/repository"
)

Expand All @@ -28,7 +28,7 @@ func runEnd2EndJob(pullRefs, seedRefs []string) ([]string, error) {
return nil, err
}

registryContainer, err := registry.LaunchContainer()
registryContainer, err := registrycontainer.Launch()
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Options struct {
PushPrefix string `short:"R" long:"push-prefix" description:"[Re]Push pulled images with a specified repo path prefix" env:"PUSH_PREFIX"`
PushUpdate bool `short:"U" long:"push-update" description:"Update our pushed images if remote image digest changes" env:"PUSH_UPDATE"`
ConcurrentRequests int `short:"c" long:"concurrent-requests" default:"32" description:"Limit of concurrent requests to the registry" env:"CONCURRENT_REQUESTS"`
WaitBetween time.Duration `short:"w" long:"wait-between" default:"0" description:"Time to wait between batches of requests (incl. pulls and pushes)" env:"WAIT_BETWEEN"`
RetryRequests int `short:"y" long:"retry-requests" default:"2" description:"Number of retries for failed Docker registry requests" env:"RETRY_REQUESTS"`
RetryDelay time.Duration `short:"D" long:"retry-delay" default:"30s" description:"Delay between retries of failed registry requests" env:"RETRY_DELAY"`
InsecureRegistryEx string `short:"I" long:"insecure-registry-ex" description:"Expression to match insecure registry hostnames" env:"INSECURE_REGISTRY_EX"`
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
apiConfig := v1.Config{
DockerJSONConfigFile: o.DockerJSON,
ConcurrentRequests: o.ConcurrentRequests,
WaitBetween: o.WaitBetween,
TraceRequests: o.TraceRequests,
RetryRequests: o.RetryRequests,
RetryDelay: o.RetryDelay,
Expand Down
5 changes: 5 additions & 0 deletions tag/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
// ConcurrentRequests defines maximum number of concurrent requests we could maintain against the registry
var ConcurrentRequests = 32

// WaitBetween defines how much we will wait between batches of requests
var WaitBetween time.Duration

// RetryRequests is a number of retries we do in case of request failure
var RetryRequests = 0

Expand Down Expand Up @@ -372,6 +375,8 @@ func FetchTags(repo *repository.Repository, username, password string) (map[stri
}(repo, tagNames[tagIndex], authorization, ch)

tagIndex++

time.Sleep(WaitBetween)
}

for s := 1; s <= stepSize; s++ {
Expand Down

0 comments on commit 109227f

Please sign in to comment.