diff --git a/CHANGELOG.md b/CHANGELOG.md index fcc079e7..4e1cc4da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ listed in the changelog. - Gradle proxy settings are set during prepare-local-env ([#291](https://github.com/opendevstack/ods-pipeline/issues/291)) - Add `xargs` to helm image as `helm-secrets` depends on it ([#465](https://github.com/opendevstack/ods-pipeline/issues/465)) - Pipeline creation fails when branch names contain slashes ([#466](https://github.com/opendevstack/ods-pipeline/issues/466)) +- Race conditions between pipelines of the same repository ([#394](https://github.com/opendevstack/ods-pipeline/issues/394)) ## [0.2.0] - 2021-12-22 ### Added diff --git a/cmd/pipeline-manager/main.go b/cmd/pipeline-manager/main.go index 77889429..e94068b0 100644 --- a/cmd/pipeline-manager/main.go +++ b/cmd/pipeline-manager/main.go @@ -154,6 +154,7 @@ func serve() error { TektonClient: tClient, BitbucketClient: bitbucketClient, PipelineRunPruner: pruner, + Logger: logger, }) if err != nil { return err diff --git a/docs/design/software-design-specification.adoc b/docs/design/software-design-specification.adoc index d69a2b26..5fb7fa3c 100644 --- a/docs/design/software-design-specification.adoc +++ b/docs/design/software-design-specification.adoc @@ -383,6 +383,8 @@ A pipeline is created or updated corresponding to the Git branch received in the A PVC is created per repository unless it exists already. The name is equal to `ods-workspace-` (shortened to 63 characters if longer). This PVC is then used in the pipeline as a shared workspace. +When no other pipeline run for the same repository is running or pending, the created/updated pipeline is started immediately. Otherwise a pending pipeline run is created, and a periodic polling is kicked off to allow the run to start once possible. Since the pipeline manager does not persist state about pending pipeline runs, polling is also started for all repositories in the related Bitbucket project when the server boots. + Pipelines and pipeline runs are pruned when a webhook trigger is received. Pipeline runs that are newer than the configured time window are protected from pruning. Older pipeline runs are cleaned up to not grow beyond the configured maximum amount. If all pipeline runs of one pipeline can be pruned, the whole pipeline is pruned. The pruning strategy is applied per repository and stage (DEV, QA, PROD) to avoid aggressive pruning of QA and PROD pipeline runs. |=== diff --git a/docs/design/software-requirements-specification.adoc b/docs/design/software-requirements-specification.adoc index 05adc3ec..e60ff59b 100644 --- a/docs/design/software-requirements-specification.adoc +++ b/docs/design/software-requirements-specification.adoc @@ -83,6 +83,9 @@ The tasks shall create artifacts of their work. Those artifacts shall be stored | SRS-PIPELINE-MANAGER-5 | The pipeline manager shall prune pipelines and pipeline runs per repository and stage. + +| SRS-PIPELINE-MANAGER-6 +| The pipeline manager shall prevent concurrent pipeline runs for one repository. |=== === Tasks Requirements diff --git a/internal/manager/bitbucket.go b/internal/manager/bitbucket.go index 51162db5..05f40ba3 100644 --- a/internal/manager/bitbucket.go +++ b/internal/manager/bitbucket.go @@ -86,3 +86,17 @@ func shouldSkip(bitbucketClient bitbucket.CommitClientInterface, projectKey, rep } return isCiSkipInCommitMessage(c.Message) } + +// getRepoNames retrieves the name of all repositories within the project +// identified by projectKey. +func getRepoNames(bitbucketClient bitbucket.RepoClientInterface, projectKey string) ([]string, error) { + repos := []string{} + rl, err := bitbucketClient.RepoList(projectKey) + if err != nil { + return repos, err + } + for _, n := range rl.Values { + repos = append(repos, n.Name) + } + return repos, nil +} diff --git a/internal/manager/pipeline.go b/internal/manager/pipeline.go index 2713c366..e2a3d779 100644 --- a/internal/manager/pipeline.go +++ b/internal/manager/pipeline.go @@ -33,8 +33,8 @@ const ( ) // createPipelineRun creates a PipelineRun resource -func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData) (*tekton.PipelineRun, error) { - pr, err := tektonClient.CreatePipelineRun(ctxt, &tekton.PipelineRun{ +func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData, needQueueing bool) (*tekton.PipelineRun, error) { + pr := &tekton.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ GenerateName: fmt.Sprintf("%s-", pData.Name), Labels: pipelineLabels(pData), @@ -55,11 +55,11 @@ func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctx }, }, }, - }, metav1.CreateOptions{}) - if err != nil { - return nil, err } - return pr, nil + if needQueueing { + pr.Spec.Status = tekton.PipelineRunSpecStatusPending + } + return tektonClient.CreatePipelineRun(ctxt, pr, metav1.CreateOptions{}) } // listPipelineRuns lists pipeline runs associated with repository. diff --git a/internal/manager/pipeline_test.go b/internal/manager/pipeline_test.go new file mode 100644 index 00000000..9425604d --- /dev/null +++ b/internal/manager/pipeline_test.go @@ -0,0 +1,64 @@ +package manager + +import ( + "context" + "testing" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/config" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" +) + +func TestCreatePipelineRun(t *testing.T) { + tc := &tektonClient.TestClient{} + ctxt := context.TODO() + pData := PipelineData{ + Name: "foo", + Repository: "repo", + GitRef: "branch", + Stage: config.DevStage, + PVC: "pvc", + } + pr, err := createPipelineRun(tc, ctxt, pData, false) + if err != nil { + t.Fatal(err) + } + if pr.GenerateName != "foo-" { + t.Fatalf("Expected generated name to be foo-, got: %s", pr.GenerateName) + } + if pr.Spec.PipelineRef.Name != "foo" { + t.Fatalf("Expected pipeline ref to be foo, got: %s", pr.Spec.PipelineRef.Name) + } + if pr.Spec.Status != "" { + t.Fatalf("Expected status to be empty, got: %s", pr.Spec.Status) + } + if pr.Labels[repositoryLabel] != pData.Repository { + t.Fatalf("Expected label %s to be %s, got: %s", repositoryLabel, pData.Repository, pr.Labels[repositoryLabel]) + } + if pr.Labels[gitRefLabel] != pData.GitRef { + t.Fatalf("Expected label %s to be %s, got: %s", gitRefLabel, pData.GitRef, pr.Labels[gitRefLabel]) + } + if pr.Labels[stageLabel] != pData.Stage { + t.Fatalf("Expected label %s to be %s, got: %s", stageLabel, pData.Stage, pr.Labels[stageLabel]) + } + workspaceCfg := pr.Spec.Workspaces[0] + if workspaceCfg.Name != sharedWorkspaceName { + t.Fatalf("Expected generated name to be %s, got: %s", sharedWorkspaceName, workspaceCfg.Name) + } + if workspaceCfg.PersistentVolumeClaim.ClaimName != "pvc" { + t.Fatalf("Expected generated name to be pvc, got: %s", workspaceCfg.Name) + } + if len(tc.CreatedPipelineRuns) != 1 { + t.Fatal("No pipeline run created") + } + pr, err = createPipelineRun(tc, ctxt, pData, true) + if err != nil { + t.Fatal(err) + } + if pr.Spec.Status != tekton.PipelineRunSpecStatusPending { + t.Fatalf("Expected status to be pending, got: %s", pr.Spec.Status) + } + if len(tc.CreatedPipelineRuns) != 2 { + t.Fatal("No pipeline run created") + } +} diff --git a/internal/manager/queue.go b/internal/manager/queue.go new file mode 100644 index 00000000..a9dc0718 --- /dev/null +++ b/internal/manager/queue.go @@ -0,0 +1,147 @@ +package manager + +import ( + "context" + "fmt" + "math/rand" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/logging" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// pipelineRunQueue manages multiple queues. These queues +// can be polled in certain intervals. +type pipelineRunQueue struct { + queues map[string]bool + pollInterval time.Duration + // logger is the logger to send logging messages to. + logger logging.LeveledLoggerInterface +} + +// StartPolling periodically checks status for given identifier. +// The time until the first time is not more than maxInitialWait. +func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool { + quit := make(chan bool) + if q.queues[identifier] { + close(quit) + return quit + } + q.queues[identifier] = true + + wait(maxInitialWait) + + ticker := time.NewTicker(q.pollInterval) + go func() { + for { + select { + case <-quit: + q.queues[identifier] = false + ticker.Stop() + return + case <-ticker.C: + q.logger.Debugf("Advancing queue for %s ...", identifier) + queueLength, err := pt.AdvanceQueue(identifier) + if err != nil { + q.logger.Warnf("error during poll tick: %s", err) + } + if queueLength == 0 { + q.logger.Debugf("Stopping to poll for %s ...", identifier) + close(quit) + } + } + } + }() + + return quit +} + +// QueueAdvancer is the interface passed to +// *pipelineRunQueue#StartPolling. +type QueueAdvancer interface { + // AdvanceQueue is called for each poll step. + AdvanceQueue(repository string) (int, error) +} + +// Queue represents a pipeline run Queue. Pipelines of one repository must +// not run in parallel. +type Queue struct { + TektonClient tektonClient.ClientPipelineRunInterface +} + +// AdvanceQueue starts the oldest pending pipeline run if there is no +// progressing pipeline run at the moment. +// It returns the queue length. +func (s *Server) AdvanceQueue(repository string) (int, error) { + s.Mutex.Lock() + defer s.Mutex.Unlock() + ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository) + if err != nil { + return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err) + } + s.Logger.Debugf("Found %d pipeline runs related to repository %s.", len(pipelineRuns.Items), repository) + if len(pipelineRuns.Items) == 0 { + return 0, nil + } + + var foundRunning bool + pendingPrs := []tekton.PipelineRun{} + for _, pr := range pipelineRuns.Items { + if pr.IsPending() { + pendingPrs = append(pendingPrs, pr) + continue + } + if pipelineRunIsProgressing(pr) { + foundRunning = true + continue + } + } + s.Logger.Debugf("Found runs for repo %s in state running=%v, pending=%d.", repository, foundRunning, len(pendingPrs)) + + if !foundRunning && len(pendingPrs) > 0 { + // update oldest pending PR + sortPipelineRunsDescending(pendingPrs) + oldestPR := pendingPrs[len(pendingPrs)-1] + pendingPrs = pendingPrs[:len(pendingPrs)-1] + s.Logger.Infof("Starting pending pipeline run %s ...", oldestPR.Name) + oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run + _, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{}) + if err != nil { + return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err) + } + } + return len(pendingPrs), nil +} + +// needsQueueing checks if any run has either: +// - pending status set OR +// - is progressing +func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool { + for _, pr := range pipelineRuns.Items { + if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) { + return true + } + } + return false +} + +// pipelineRunIsProgressing returns true if the PR is not done, not pending, +// not cancelled, and not timed out. +func pipelineRunIsProgressing(pr tekton.PipelineRun) bool { + return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut()) +} + +// wait waits for up to maxInitialWait. The exact wait time is +// pseudo-randomized if maxInitialWait is longer than one second. +func wait(maxInitialWait time.Duration) { + initialWait := time.Second + if maxInitialWait > time.Second { + initialWait = time.Duration(rand.Intn(int(maxInitialWait.Seconds())-1) + 1) + } + timer := time.NewTimer(initialWait) + <-timer.C +} diff --git a/internal/manager/queue_test.go b/internal/manager/queue_test.go new file mode 100644 index 00000000..347b603a --- /dev/null +++ b/internal/manager/queue_test.go @@ -0,0 +1,229 @@ +package manager + +import ( + "testing" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/logging" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// fakeAdvancerDone is always done advancing the queue. +type fakeAdvancerDone struct { + Logger logging.LeveledLoggerInterface +} + +func (f *fakeAdvancerDone) AdvanceQueue(repository string) (int, error) { + return 0, nil +} + +func TestPollIdentifier(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{ + "a": true, + "b": false, + }, + pollInterval: time.Second, + logger: &logging.LeveledLogger{Level: logging.LevelNull}, + } + f := &fakeAdvancerDone{ + Logger: &logging.LeveledLogger{Level: logging.LevelNull}, + } + p.StartPolling(f, "a", time.Second) + p.StartPolling(f, "b", time.Second) + if !p.queues["a"] { + t.Fatal("polling state for 'a' should be true") + } + if !p.queues["b"] { + t.Fatal("polling state for 'b' should be true") + } +} + +// fakeAdvancerSteps can be called a few times before it is done advancing the queue. +type fakeAdvancerSteps struct { + count int + Logger logging.LeveledLoggerInterface +} + +func (f *fakeAdvancerSteps) AdvanceQueue(repository string) (int, error) { + if f.count < 2 { + f.count++ + return 1, nil + } + return 0, nil +} + +func TestAdvanceQueueAndQuit(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Millisecond, + logger: &logging.LeveledLogger{Level: logging.LevelNull}, + } + f := &fakeAdvancerSteps{ + Logger: &logging.LeveledLogger{Level: logging.LevelNull}, + } + done := p.StartPolling(f, "a", time.Second) + select { + case <-done: + t.Log("quit occured") + case <-time.After(5 * time.Second): + t.Fatal("quit should have occured") + } +} + +func TestAdvanceQueue(t *testing.T) { + tests := map[string]struct { + runs []*tekton.PipelineRun + wantStart string + wantPollDone bool + }{ + "none": { + runs: []*tekton.PipelineRun{}, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, none pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + }, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, one pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now()), + }, + wantStart: "two", + wantPollDone: true, + }, + "one cancelled, two pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + pendingPipelineRun(t, "three", time.Now().Add(time.Minute*-2)), + }, + wantStart: "three", + wantPollDone: false, + }, + "two pending": { + runs: []*tekton.PipelineRun{ + pendingPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "one", + wantPollDone: false, + }, + "one timed out, one pending": { + runs: []*tekton.PipelineRun{ + timedOutPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "two", + wantPollDone: true, + }, + "one running, one pending": { + runs: []*tekton.PipelineRun{ + runningPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "", + wantPollDone: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + tclient := &tektonClient.TestClient{PipelineRuns: tc.runs} + s := &Server{TektonClient: tclient, Logger: &logging.LeveledLogger{Level: logging.LevelNull}} + queueLength, err := s.AdvanceQueue("a") + if err != nil { + t.Fatal(err) + } + if tc.wantStart != "" { + if len(tclient.UpdatedPipelineRuns) != 1 { + t.Fatal("should have updated one run") + } + if tclient.UpdatedPipelineRuns[0] != tc.wantStart { + t.Fatalf("should have updated run '%s'", tc.wantStart) + } + } else { + if len(tclient.UpdatedPipelineRuns) > 0 { + t.Fatal("should not have updated any run") + } + } + if (queueLength == 0) != tc.wantPollDone { + t.Fatalf("want polling to be done: %v, but queue length is: %d", tc.wantPollDone, queueLength) + } + }) + } +} + +func pendingPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusPending, + }, + } + if !pr.IsPending() { + t.Fatal("pr should be pending") + } + return pr +} + +func cancelledPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusCancelled, + }, + } + if !pr.IsCancelled() || pr.IsPending() || pr.IsDone() || pr.IsTimedOut() { + t.Fatal("pr should be cancelled") + } + return pr +} + +func timedOutPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + // pipelineTimeout := pr.Spec.Timeout + // startTime := pr.Status.StartTime + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Timeout: &metav1.Duration{Duration: time.Second}, + }, + Status: tekton.PipelineRunStatus{ + PipelineRunStatusFields: tekton.PipelineRunStatusFields{ + StartTime: &metav1.Time{Time: time.Now().Add(-2 * time.Second)}, + }, + }, + } + if !pr.IsTimedOut() || pr.IsPending() || pr.IsDone() || pr.IsCancelled() { + t.Fatal("pr should be timed out") + } + return pr +} + +func runningPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + } + if pr.IsDone() || pr.IsPending() || pr.IsTimedOut() || pr.IsCancelled() { + t.Fatal("pr should be running") + } + return pr +} diff --git a/internal/manager/server.go b/internal/manager/server.go index 329e00a3..e85b58cc 100644 --- a/internal/manager/server.go +++ b/internal/manager/server.go @@ -27,6 +27,8 @@ const ( allowedChangeRefType = "BRANCH" // letterBytes contains letters to use for random strings. letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + // defaultQueuePollInterval defines the queue poll interval in seconds. + defaultQueuePollIntervalSeconds = 30 ) // Server represents this service, and is a global. @@ -44,6 +46,7 @@ type Server struct { BitbucketClient bitbucketInterface PipelineRunPruner PipelineRunPruner Mutex sync.Mutex + RunQueue *pipelineRunQueue Logger logging.LeveledLoggerInterface } @@ -80,6 +83,9 @@ type ServerConfig struct { BitbucketClient bitbucketInterface // PipelineRunPruner is responsible to prune pipeline runs. PipelineRunPruner PipelineRunPruner + // PollInterval defines the interval between polling for pending pipelines + // in order to start one if possible. + PollInterval time.Duration // Logger is the logger to send logging messages to. Logger logging.LeveledLoggerInterface } @@ -135,6 +141,14 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { if serverConfig.Logger == nil { serverConfig.Logger = &logging.LeveledLogger{Level: logging.LevelError} } + runQueue := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Duration(defaultQueuePollIntervalSeconds) * time.Second, + logger: serverConfig.Logger, + } + if serverConfig.PollInterval > 0 { + runQueue.pollInterval = serverConfig.PollInterval + } s := &Server{ KubernetesClient: serverConfig.KubernetesClient, TektonClient: serverConfig.TektonClient, @@ -148,8 +162,32 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { TaskSuffix: serverConfig.TaskSuffix, StorageConfig: serverConfig.StorageConfig, PipelineRunPruner: serverConfig.PipelineRunPruner, + RunQueue: runQueue, Logger: serverConfig.Logger, } + + go func() { + s.Logger.Infof( + "Retrieving repositories of Bitbucket project %s to check for pending pipeline runs ...", + s.Project, + ) + repos, err := getRepoNames(s.BitbucketClient, s.Project) + if err != nil { + s.Logger.Infof( + "could not retrieve repositories from Bitbucket to poll for pending pipeline runs: %s", + err, + ) + } + s.Logger.Debugf( + "Found %d repositories for which to check for pending pipeline runs.", len(repos), + ) + for _, r := range repos { + s.Logger.Infof( + "Checking for pending pipeline runs for repository %s ...", r, + ) + s.RunQueue.StartPolling(s, r, 10*time.Second) + } + }() return s, nil } @@ -385,7 +423,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.Logger.Infof("%s %+v", requestID, pData) - _, err = createPipelineRun(s.TektonClient, r.Context(), pData) + needQueueing := needsQueueing(pipelineRuns) + _, err = createPipelineRun(s.TektonClient, r.Context(), pData, needQueueing) if err != nil { msg := "cannot create pipeline run" s.Logger.Errorf("%s %s: %s", requestID, msg, err) @@ -393,6 +432,12 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { return } + // if need queueing, then schedule poller + if needQueueing { + s.Logger.Debugf("%s Start polling mechanism to start pending pipeline when possible ...", requestID) + s.RunQueue.StartPolling(s, pData.Repository, 30*time.Second) + } + err = json.NewEncoder(w).Encode(pData) if err != nil { s.Logger.Errorf("%s cannot write body: %s", requestID, err)