diff --git a/README.md b/README.md index c371d8f..739c78c 100644 --- a/README.md +++ b/README.md @@ -28,19 +28,16 @@ You could use `lstags`, if you ... * ... poll registry for the new images pushed (to take some action afterwards, run CI for example). * ... compare local images with registry ones (e.g. know, if image tagged "latest" was re-pushed). -## How do I use it myself? -I run `lstags` inside a Cron Job on my Kubernetes worker nodes to poll my own Docker registry for a new [stable] images. +... pull Ubuntu 14.04 & 16.04, all the Alpine images and Debian "stretch" to have latest software to play with: ``` -lstags --pull registry.ivanilves.local/tools/sicario~/v1\\.[0-9]+$/ +lstags --pull ubuntu~/^1[46]\\.04$/ alpine debian~/stretch/ ``` -**NB!** In case you use private registry with authentication, make sure your Docker client knows how to authenticate against it! -`lstags` will reuse credentials saved by Docker client in its `config.json` file, one usually found at `~/.docker/config.json` - -... and following cronjob runs on my CI server to ensure I always have latest Ubuntu 14.04 and 16.04 images to play with: +... pull and re-push CoreOS-related images from `quay.io` to your own registry (in case these hipsters will break everything): ``` -lstags --pull ubuntu~/^1[46]\\.04$/ +lstags --push-prefix=/quay --push-registry=registry.company.io quay.io/coreos/hyperkube quay.io/coreos/flannel ``` -My CI server is connected over crappy Internet link and pulling images in advance makes `docker run` much faster. :wink: +**NB!** In case you use private registry with authentication, make sure your Docker client knows how to authenticate against it! +`lstags` will reuse credentials saved by Docker client in its `config.json` file, one usually found at `~/.docker/config.json` ## Image state `lstags` distinguishes four states of Docker image: @@ -55,7 +52,6 @@ There is also special `UNKNOWN` state, which means `lstags` failed to detect ima You can either: * rely on `lstags` discovering credentials "automagically" :tophat: * load credentials from any Docker JSON config file specified -* pass username and password explicitly, via the command line ## Install: Binaries https://github.com/ivanilves/lstags/releases diff --git a/main.go b/main.go index 61e5866..b502840 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "os" + "strings" "github.com/jessevdk/go-flags" @@ -21,8 +22,10 @@ import ( type Options struct { DockerJSON string `short:"j" long:"docker-json" default:"~/.docker/config.json" description:"JSON file with credentials" env:"DOCKER_JSON"` Pull bool `short:"p" long:"pull" description:"Pull Docker images matched by filter (will use local Docker deamon)" env:"PULL"` + Push bool `short:"P" long:"push" description:"Push Docker images matched by filter to some registry (See 'push-registry')" env:"PUSH"` PushRegistry string `short:"r" long:"push-registry" description:"[Re]Push pulled images to a specified remote registry" env:"PUSH_REGISTRY"` PushPrefix string `short:"R" long:"push-prefix" description:"[Re]Push pulled images with a specified repo path prefix" env:"PUSH_PREFIX"` + PushUpdate bool `short:"U" long:"push-update" description:"Update our pushed images if remote image digest changes" env:"PUSH_UPDATE"` ConcurrentRequests int `short:"c" long:"concurrent-requests" default:"32" description:"Limit of concurrent requests to the registry" env:"CONCURRENT_REQUESTS"` TraceRequests bool `short:"T" long:"trace-requests" description:"Trace Docker registry HTTP requests" env:"TRACE_REQUESTS"` DoNotFail bool `short:"N" long:"do-not-fail" description:"Do not fail on non-critical errors (could be dangerous!)" env:"DO_NOT_FAIL"` @@ -61,8 +64,12 @@ func parseFlags() (*Options, error) { return nil, errors.New("Need at least one repository name, e.g. 'nginx~/^1\\\\.13/' or 'mesosphere/chronos'") } - if o.PushRegistry != "" { - o.Pull = true + if o.PushRegistry != "localhost:5000" && o.PushRegistry != "" { + o.Push = true + } + + if o.Pull && o.Push { + return nil, errors.New("You either '--pull' or '--push', not both") } remote.TraceRequests = o.TraceRequests @@ -92,17 +99,15 @@ func main() { suicide(err, true) } - const format = "%-12s %-45s %-15s %-25s %s\n" - fmt.Printf(format, "", "", "<(local) ID>", "", "") - repoCount := len(o.Positional.Repositories) - pullCount := 0 - pushCount := 0 tcc := make(chan tag.Collection, repoCount) + pullCount := 0 + pushCount := 0 + for _, repoWithFilter := range o.Positional.Repositories { - go func(repoWithFilter string, concurrentRequests int, tcc chan tag.Collection) { + go func(repoWithFilter string, tcc chan tag.Collection) { repository, filter, err := util.SeparateFilterAndRepo(repoWithFilter) if err != nil { suicide(err, true) @@ -113,6 +118,8 @@ func main() { repoPath := docker.GetRepoPath(repository, registry) repoName := docker.GetRepoName(repository, registry) + fmt.Printf("ANALYZE %s\n", repoName) + username, password, _ := dockerConfig.GetCredentials(registry) tr, err := auth.NewToken(registry, repoPath, username, password) @@ -120,7 +127,7 @@ func main() { suicide(err, true) } - remoteTags, err := remote.FetchTags(registry, repoPath, tr.AuthHeader(), concurrentRequests) + remoteTags, err := remote.FetchTags(registry, repoPath, tr.AuthHeader(), o.ConcurrentRequests, filter) if err != nil { suicide(err, true) } @@ -133,6 +140,7 @@ func main() { sortedKeys, names, joinedTags := tag.Join(remoteTags, localTags) tags := make([]*tag.Tag, 0) + pullTags := make([]*tag.Tag, 0) for _, key := range sortedKeys { name := names[key] @@ -143,34 +151,91 @@ func main() { } if tg.NeedsPull() { + pullTags = append(pullTags, tg) pullCount++ } - pushCount++ tags = append(tags, tg) } + var pushPrefix string + pushTags := make([]*tag.Tag, 0) + if o.Push { + tags = make([]*tag.Tag, 0) + + pushPrefix = o.PushPrefix + if pushPrefix == "" { + pushPrefix = util.GeneratePathFromHostname(registry) + } + + var pushRepoPath string + pushRepoPath = pushPrefix + "/" + repoPath + pushRepoPath = pushRepoPath[1:] // Leading "/" in prefix should be removed! + + username, password, _ := dockerConfig.GetCredentials(o.PushRegistry) + + tr, err := auth.NewToken(o.PushRegistry, pushRepoPath, username, password) + if err != nil { + suicide(err, true) + } + + alreadyPushedTags, err := remote.FetchTags(o.PushRegistry, pushRepoPath, tr.AuthHeader(), o.ConcurrentRequests, filter) + if err != nil { + if !strings.Contains(err.Error(), "404 Not Found") { + suicide(err, true) + } + + alreadyPushedTags = make(map[string]*tag.Tag) + } + + sortedKeys, names, joinedTags := tag.Join(remoteTags, alreadyPushedTags) + for _, key := range sortedKeys { + name := names[key] + + tg := joinedTags[name] + + if !util.DoesMatch(tg.GetName(), filter) { + continue + } + + if tg.NeedsPush(o.PushUpdate) { + pushTags = append(pushTags, tg) + pushCount++ + } + + tags = append(tags, tg) + } + } + tcc <- tag.Collection{ - Registry: registry, - RepoName: repoName, - RepoPath: repoPath, - Tags: tags, + Registry: registry, + RepoName: repoName, + RepoPath: repoPath, + Tags: tags, + PullTags: pullTags, + PushTags: pushTags, + PushPrefix: pushPrefix, } - }(repoWithFilter, o.ConcurrentRequests, tcc) + }(repoWithFilter, tcc) } - tagCollections := make([]tag.Collection, repoCount) - repoNumber := 0 + tagCollections := make([]tag.Collection, repoCount-1) + + r := 0 for tc := range tcc { - tagCollections = append(tagCollections, tc) + fmt.Printf("FETCHED %s\n", tc.RepoName) - repoNumber++ + tagCollections = append(tagCollections, tc) + r++ - if repoNumber >= repoCount { + if r >= repoCount { close(tcc) } } + const format = "%-12s %-45s %-15s %-25s %s\n" + fmt.Printf("-\n") + fmt.Printf(format, "", "", "<(local) ID>", "", "") for _, tc := range tagCollections { for _, tg := range tc.Tags { fmt.Printf( @@ -183,25 +248,23 @@ func main() { ) } } + fmt.Printf("-\n") if o.Pull { done := make(chan bool, pullCount) for _, tc := range tagCollections { go func(tc tag.Collection, done chan bool) { - for _, tg := range tc.Tags { - if tg.NeedsPull() { - ref := tc.RepoName + ":" + tg.GetName() - - fmt.Printf("PULLING %s\n", ref) - err := dc.Pull(ref) - if err != nil { - suicide(err, false) - } + for _, tg := range tc.PullTags { + ref := tc.RepoName + ":" + tg.GetName() - done <- true + fmt.Printf("PULLING %s\n", ref) + err := dc.Pull(ref) + if err != nil { + suicide(err, false) } + done <- true } }(tc, done) } @@ -218,21 +281,25 @@ func main() { } } - if o.Pull && o.PushRegistry != "" { + if o.Push { done := make(chan bool, pushCount) for _, tc := range tagCollections { - go func(tc tag.Collection, pushRegistry, pushPrefix string, done chan bool) { - for _, tg := range tc.Tags { - if pushPrefix == "" { - pushPrefix = util.GeneratePathFromHostname(tc.Registry) - } + go func(tc tag.Collection, done chan bool) { + for _, tg := range tc.PushTags { + var err error srcRef := tc.RepoName + ":" + tg.GetName() - dstRef := pushRegistry + pushPrefix + "/" + tc.RepoPath + ":" + tg.GetName() + dstRef := o.PushRegistry + tc.PushPrefix + "/" + tc.RepoPath + ":" + tg.GetName() + + fmt.Printf("[PULL/PUSH] PULLING %s\n", srcRef) + err = dc.Pull(srcRef) + if err != nil { + suicide(err, false) + } - fmt.Printf("PUSHING %s => %s\n", srcRef, dstRef) - err := dc.Tag(srcRef, dstRef) + fmt.Printf("[PULL/PUSH] PUSHING %s => %s\n", srcRef, dstRef) + err = dc.Tag(srcRef, dstRef) if err != nil { suicide(err, true) } @@ -243,7 +310,7 @@ func main() { done <- true } - }(tc, o.PushRegistry, o.PushPrefix, done) + }(tc, done) } p := 0 diff --git a/main_test.go b/main_test.go index a885508..3a014a5 100644 --- a/main_test.go +++ b/main_test.go @@ -29,7 +29,7 @@ func TestDockerHubWithPublicRepo(t *testing.T) { t.Fatalf("Failed to get DockerHub public repo token: %s", err.Error()) } - tags, err := remote.FetchTags(dockerHub, repo, tr.AuthHeader(), 128) + tags, err := remote.FetchTags(dockerHub, repo, tr.AuthHeader(), 128, ".*") if err != nil { t.Fatalf("Failed to list DockerHub public repo (%s) tags: %s", repo, err.Error()) } @@ -60,7 +60,7 @@ func TestDockerHubWithPrivateRepo(t *testing.T) { t.Fatalf("Failed to get DockerHub private repo token: %s", err.Error()) } - tags, err := remote.FetchTags(dockerHub, repo, tr.AuthHeader(), 128) + tags, err := remote.FetchTags(dockerHub, repo, tr.AuthHeader(), 128, ".*") if err != nil { t.Fatalf("Failed to list DockerHub private repo (%s) tags: %s", repo, err.Error()) } diff --git a/tag/remote/remote.go b/tag/remote/remote.go index 26bbfc5..2901e5d 100644 --- a/tag/remote/remote.go +++ b/tag/remote/remote.go @@ -15,6 +15,7 @@ import ( "time" "github.com/ivanilves/lstags/tag" + "github.com/ivanilves/lstags/util" ) // WebSchema defines how do we connect to remote web servers @@ -119,25 +120,31 @@ func fetchTagNames(registry, repo, authorization string) ([]string, error) { return tagNames, nil } -func extractCreatedFromHistory(s string) (int64, error) { +type imageMetadata struct { + Created int64 + ContainerID string +} + +func extractMetadataFromHistory(s string) (imageMetadata, error) { var history struct { - Created string `json:"created"` + Created string `json:"created"` + ContainerID string `json:"container"` } err := json.Unmarshal([]byte(s), &history) if err != nil { - return 0, err + return imageMetadata{}, err } t, err := time.Parse(time.RFC3339, history.Created) - return t.Unix(), nil + return imageMetadata{t.Unix(), history.ContainerID}, nil } -func fetchCreated(url, authorization string) (int64, error) { +func fetchMetadata(url, authorization string) (imageMetadata, error) { resp, err := httpRequest(url, authorization, "v1") if err != nil { - return -1, nil + return imageMetadata{}, nil } var v1manifest struct { @@ -146,19 +153,19 @@ func fetchCreated(url, authorization string) (int64, error) { decodingError := json.NewDecoder(resp.Body).Decode(&v1manifest) if decodingError != nil { - return -1, decodingError + return imageMetadata{}, decodingError } if len(v1manifest.History) > 0 { - created, err := extractCreatedFromHistory(v1manifest.History[0]["v1Compatibility"]) + metadata, err := extractMetadataFromHistory(v1manifest.History[0]["v1Compatibility"]) if err != nil { - return -1, err + return imageMetadata{}, err } - return created, nil + return metadata, nil } - return -1, errors.New("no source to fetch image creation date/time from") + return imageMetadata{}, errors.New("no source to fetch image creation date/time from") } func fetchDigest(url, authorization string) (string, error) { @@ -175,11 +182,11 @@ func fetchDigest(url, authorization string) (string, error) { return digests[0], nil } -func fetchDetails(registry, repo, tagName, authorization string) (string, int64, error) { +func fetchDetails(registry, repo, tagName, authorization string) (string, imageMetadata, error) { url := WebSchema + registry + "/v2/" + repo + "/manifests/" + tagName dc := make(chan string, 0) - cc := make(chan int64, 0) + mc := make(chan imageMetadata, 0) ec := make(chan error, 0) go func(url, authorization string, dc chan string, ec chan error) { @@ -191,41 +198,42 @@ func fetchDetails(registry, repo, tagName, authorization string) (string, int64, dc <- digest }(url, authorization, dc, ec) - go func(url, authorization string, cc chan int64, ec chan error) { - created, err := fetchCreated(url, authorization) + go func(url, authorization string, mc chan imageMetadata, ec chan error) { + metadata, err := fetchMetadata(url, authorization) if err != nil { ec <- err } - cc <- created - }(url, authorization, cc, ec) + mc <- metadata + }(url, authorization, mc, ec) var digest string - var created int64 + var metadata imageMetadata waitForDigest := true - waitForCreated := true - for waitForDigest || waitForCreated { + waitForMetadata := true + for waitForDigest || waitForMetadata { select { case digest = <-dc: waitForDigest = false - case created = <-cc: - waitForCreated = false + case metadata = <-mc: + waitForMetadata = false case err := <-ec: if err != nil { - return "", 0, err + return "", imageMetadata{}, err } } } - return digest, created, nil + return digest, metadata, nil } type detailResponse struct { - TagName string - Digest string - Created int64 - Error error + TagName string + Digest string + Created int64 + ContainerID string + Error error } func validateConcurrentRequests(concurrentRequests int) (int, error) { @@ -263,17 +271,24 @@ func calculateBatchStepSize(stepNumber, stepsTotal, remain, limit int) int { } // FetchTags looks up Docker repo tags present on remote Docker registry -func FetchTags(registry, repo, authorization string, concurrentRequests int) (map[string]*tag.Tag, error) { +func FetchTags(registry, repo, authorization string, concurrentRequests int, filter string) (map[string]*tag.Tag, error) { batchLimit, err := validateConcurrentRequests(concurrentRequests) if err != nil { return nil, err } - tagNames, err := fetchTagNames(registry, repo, authorization) + allTagNames, err := fetchTagNames(registry, repo, authorization) if err != nil { return nil, err } + tagNames := make([]string, 0) + for _, tagName := range allTagNames { + if util.DoesMatch(tagName, filter) { + tagNames = append(tagNames, tagName) + } + } + tags := make(map[string]*tag.Tag) batchSteps, batchRemain := calculateBatchSteps(len(tagNames), batchLimit) @@ -287,9 +302,9 @@ func FetchTags(registry, repo, authorization string, concurrentRequests int) (ma for s := 1; s <= stepSize; s++ { go func(registry, repo, tagName, authorization string, ch chan detailResponse) { - digest, created, err := fetchDetails(registry, repo, tagName, authorization) + digest, metadata, err := fetchDetails(registry, repo, tagName, authorization) - ch <- detailResponse{TagName: tagName, Digest: digest, Created: created, Error: err} + ch <- detailResponse{TagName: tagName, Digest: digest, Created: metadata.Created, ContainerID: metadata.ContainerID, Error: err} }(registry, repo, tagNames[tagIndex], authorization, ch) tagIndex++ @@ -308,6 +323,7 @@ func FetchTags(registry, repo, authorization string, concurrentRequests int) (ma } tt.SetCreated(dr.Created) + tt.SetContainerID(dr.ContainerID) tags[tt.GetName()] = tt } diff --git a/tag/tag.go b/tag/tag.go index 0b60468..437b99b 100644 --- a/tag/tag.go +++ b/tag/tag.go @@ -10,11 +10,12 @@ import ( // Tag aggregates tag-related information: tag name, image digest etc type Tag struct { - name string - digest string - imageID string - state string - created int64 + name string + digest string + imageID string + state string + created int64 + containerID string } // SortKey returns a sort key @@ -67,6 +68,11 @@ func (tg *Tag) GetImageID() string { return tg.imageID } +// HasImageID tells us if Docker image has an ID defined +func (tg *Tag) HasImageID() bool { + return len(tg.imageID) > 0 +} + // SetState sets repo tag state func (tg *Tag) SetState(state string) { tg.state = state @@ -86,6 +92,15 @@ func (tg *Tag) NeedsPull() bool { return false } +// NeedsPush tells us if tag/image needs push to a registry +func (tg *Tag) NeedsPush(doUpdate bool) bool { + if tg.state == "ABSENT" || tg.state == "UNKNOWN" || (tg.state == "CHANGED" && doUpdate) { + return true + } + + return false +} + // SetCreated sets image creation timestamp func (tg *Tag) SetCreated(created int64) { tg.created = created @@ -96,6 +111,21 @@ func (tg *Tag) GetCreated() int64 { return tg.created } +// SetContainerID sets "container ID": an OAF "image digest" generated locally +func (tg *Tag) SetContainerID(containerID string) { + tg.containerID = containerID +} + +// GetContainerID gets "container ID": an OAF "image digest" generated locally +func (tg *Tag) GetContainerID() string { + return tg.containerID +} + +// HasContainerID tells us if tag has "container ID" +func (tg *Tag) HasContainerID() bool { + return len(tg.containerID) > 0 +} + // GetCreatedKey gets image creation timestamp in a string form (for a string sort e.g.) func (tg *Tag) GetCreatedKey() string { return strconv.FormatInt(tg.created, 10) @@ -144,6 +174,12 @@ func calculateState(name string, remoteTags, localTags map[string]*Tag) string { return "PRESENT" } + if r.HasContainerID() && l.HasContainerID() { + if r.GetContainerID() == l.GetContainerID() { + return "PRESENT" + } + } + return "CHANGED" } @@ -168,7 +204,7 @@ func Join(remoteTags, localTags map[string]*Tag) ([]string, map[string]string, m joinedTags[name] = remoteTags[name] ltg, defined := localTags[name] - if defined { + if defined && ltg.HasImageID() { joinedTags[name].SetImageID(ltg.GetImageID()) } else { joinedTags[name].SetImageID("n/a") @@ -204,8 +240,11 @@ func Join(remoteTags, localTags map[string]*Tag) ([]string, map[string]string, m // Collection encapsulates collection of tags received from a registry/repository query type Collection struct { - Registry string - RepoName string - RepoPath string - Tags []*Tag + Registry string + RepoName string + RepoPath string + Tags []*Tag + PullTags []*Tag + PushTags []*Tag + PushPrefix string } diff --git a/tag/tag_test.go b/tag/tag_test.go index e47806e..d253b06 100644 --- a/tag/tag_test.go +++ b/tag/tag_test.go @@ -75,6 +75,19 @@ func remoteTags() map[string]*Tag { ) tags["v1.2"] = tg3 + tg4, _ := New( + "v1.3.1", + "sha256:9fb0e8a4f629b72a0a69aef357e637e4145b6588f04f1540a31a0d2e030ea7da", + ) + tags["v1.3.1"] = tg4 + + tg5, _ := New( + "v1.3.2", + "sha256:fc41473fc36c09222a29ffce9eaf5732fae91c3fabfa40aa878f600e13c7fed3", + ) + tg5.SetContainerID("16dcde7895c73c98161aa6981c4ea0df027697cd") + tags["v1.3.2"] = tg5 + return tags } @@ -102,11 +115,24 @@ func localTags() map[string]*Tag { tg3.SetImageID("sha256:4c4ebb9614ef823bd04e5eba65e59286a4314d3a063e2eaa221d38fc21723cea") tags["v1.2"] = tg3 + tg4, _ := New( + "v1.3.1", + "sha256:7264ba7450b6be1bfba9ab29f506293bb324f4764c41ff32dcc04379c1a69c91", + ) + tags["v1.3.1"] = tg4 + + tg5, _ := New( + "v1.3.2", + "sha256:70fbfacca0ab3ec01258b1787b02de77474c6f120b86bb8743b81b7dc37d4aab", + ) + tg5.SetContainerID("16dcde7895c73c98161aa6981c4ea0df027697cd") + tags["v1.3.2"] = tg5 + return tags } func TestJoinLength(t *testing.T) { - const expected = 4 + const expected = 6 _, _, tags := Join(remoteTags(), localTags()) @@ -171,6 +197,8 @@ func TestJoinState(t *testing.T) { "v1.0": "LOCAL-ONLY", "v1.1": "ABSENT", "v1.2": "PRESENT", + "v1.3.1": "CHANGED", + "v1.3.2": "PRESENT", } _, _, tags := Join(remoteTags(), localTags()) @@ -186,3 +214,45 @@ func TestJoinState(t *testing.T) { } } } + +func TestJoinTagNeedsPushWithoutPushUpdate(t *testing.T) { + examples := map[string]bool{ + "v1.3.1": false, + "v1.3.2": false, + } + _, _, tags := Join(remoteTags(), localTags()) + + for name, expected := range examples { + needsPush := tags[name].NeedsPush(false) + + if needsPush != expected { + t.Fatalf( + "Unexpected push need [%s]: %v (expected: %v)", + name, + needsPush, + expected, + ) + } + } +} + +func TestJoinTagNeedsPushWithPushUpdate(t *testing.T) { + examples := map[string]bool{ + "v1.3.1": true, + "v1.3.2": false, + } + _, _, tags := Join(remoteTags(), localTags()) + + for name, expected := range examples { + needsPush := tags[name].NeedsPush(true) + + if needsPush != expected { + t.Fatalf( + "Unexpected push need [%s]: %v (expected: %v)", + name, + needsPush, + expected, + ) + } + } +}