diff --git a/cmd/pipeline-manager/main.go b/cmd/pipeline-manager/main.go index 77889429..e94068b0 100644 --- a/cmd/pipeline-manager/main.go +++ b/cmd/pipeline-manager/main.go @@ -154,6 +154,7 @@ func serve() error { TektonClient: tClient, BitbucketClient: bitbucketClient, PipelineRunPruner: pruner, + Logger: logger, }) if err != nil { return err diff --git a/internal/manager/bitbucket.go b/internal/manager/bitbucket.go index 51162db5..05f40ba3 100644 --- a/internal/manager/bitbucket.go +++ b/internal/manager/bitbucket.go @@ -86,3 +86,17 @@ func shouldSkip(bitbucketClient bitbucket.CommitClientInterface, projectKey, rep } return isCiSkipInCommitMessage(c.Message) } + +// getRepoNames retrieves the name of all repositories within the project +// identified by projectKey. +func getRepoNames(bitbucketClient bitbucket.RepoClientInterface, projectKey string) ([]string, error) { + repos := []string{} + rl, err := bitbucketClient.RepoList(projectKey) + if err != nil { + return repos, err + } + for _, n := range rl.Values { + repos = append(repos, n.Name) + } + return repos, nil +} diff --git a/internal/manager/pipeline.go b/internal/manager/pipeline.go index 2713c366..e2a3d779 100644 --- a/internal/manager/pipeline.go +++ b/internal/manager/pipeline.go @@ -33,8 +33,8 @@ const ( ) // createPipelineRun creates a PipelineRun resource -func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData) (*tekton.PipelineRun, error) { - pr, err := tektonClient.CreatePipelineRun(ctxt, &tekton.PipelineRun{ +func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData, needQueueing bool) (*tekton.PipelineRun, error) { + pr := &tekton.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ GenerateName: fmt.Sprintf("%s-", pData.Name), Labels: pipelineLabels(pData), @@ -55,11 +55,11 @@ func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctx }, }, }, - }, metav1.CreateOptions{}) - if err != nil { - return nil, err } - return pr, nil + if needQueueing { + pr.Spec.Status = tekton.PipelineRunSpecStatusPending + } + return tektonClient.CreatePipelineRun(ctxt, pr, metav1.CreateOptions{}) } // listPipelineRuns lists pipeline runs associated with repository. diff --git a/internal/manager/pipeline_test.go b/internal/manager/pipeline_test.go new file mode 100644 index 00000000..9425604d --- /dev/null +++ b/internal/manager/pipeline_test.go @@ -0,0 +1,64 @@ +package manager + +import ( + "context" + "testing" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/config" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" +) + +func TestCreatePipelineRun(t *testing.T) { + tc := &tektonClient.TestClient{} + ctxt := context.TODO() + pData := PipelineData{ + Name: "foo", + Repository: "repo", + GitRef: "branch", + Stage: config.DevStage, + PVC: "pvc", + } + pr, err := createPipelineRun(tc, ctxt, pData, false) + if err != nil { + t.Fatal(err) + } + if pr.GenerateName != "foo-" { + t.Fatalf("Expected generated name to be foo-, got: %s", pr.GenerateName) + } + if pr.Spec.PipelineRef.Name != "foo" { + t.Fatalf("Expected pipeline ref to be foo, got: %s", pr.Spec.PipelineRef.Name) + } + if pr.Spec.Status != "" { + t.Fatalf("Expected status to be empty, got: %s", pr.Spec.Status) + } + if pr.Labels[repositoryLabel] != pData.Repository { + t.Fatalf("Expected label %s to be %s, got: %s", repositoryLabel, pData.Repository, pr.Labels[repositoryLabel]) + } + if pr.Labels[gitRefLabel] != pData.GitRef { + t.Fatalf("Expected label %s to be %s, got: %s", gitRefLabel, pData.GitRef, pr.Labels[gitRefLabel]) + } + if pr.Labels[stageLabel] != pData.Stage { + t.Fatalf("Expected label %s to be %s, got: %s", stageLabel, pData.Stage, pr.Labels[stageLabel]) + } + workspaceCfg := pr.Spec.Workspaces[0] + if workspaceCfg.Name != sharedWorkspaceName { + t.Fatalf("Expected generated name to be %s, got: %s", sharedWorkspaceName, workspaceCfg.Name) + } + if workspaceCfg.PersistentVolumeClaim.ClaimName != "pvc" { + t.Fatalf("Expected generated name to be pvc, got: %s", workspaceCfg.Name) + } + if len(tc.CreatedPipelineRuns) != 1 { + t.Fatal("No pipeline run created") + } + pr, err = createPipelineRun(tc, ctxt, pData, true) + if err != nil { + t.Fatal(err) + } + if pr.Spec.Status != tekton.PipelineRunSpecStatusPending { + t.Fatalf("Expected status to be pending, got: %s", pr.Spec.Status) + } + if len(tc.CreatedPipelineRuns) != 2 { + t.Fatal("No pipeline run created") + } +} diff --git a/internal/manager/queue.go b/internal/manager/queue.go new file mode 100644 index 00000000..3ca01de3 --- /dev/null +++ b/internal/manager/queue.go @@ -0,0 +1,143 @@ +package manager + +import ( + "context" + "fmt" + "math/rand" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/logging" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// pipelineRunQueue manages multiple queues. These queues +// can be polled in vertain intervals. +type pipelineRunQueue struct { + queues map[string]bool + pollInterval time.Duration + // logger is the logger to send logging messages to. + logger logging.LeveledLoggerInterface +} + +// StartPolling periodically checks status for given identifier. +// The time until the first time is not more than maxInitialWait. +func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool { + quit := make(chan bool) + if q.queues[identifier] { + close(quit) + return quit + } + q.queues[identifier] = true + + maxInitialWaitSeconds := int(maxInitialWait.Seconds()) + var ticker *time.Ticker + if maxInitialWaitSeconds > 1 { + initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1 + ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second) + } else { + ticker = time.NewTicker(time.Second) + } + go func() { + for { + select { + case <-quit: + q.queues[identifier] = false + ticker.Stop() + return + case <-ticker.C: + ticker.Stop() + ticker = time.NewTicker(q.pollInterval) + q.logger.Debugf("Advancing queue for %s ...", identifier) + queueLength, err := pt.AdvanceQueue(identifier) + if err != nil { + q.logger.Warnf("error during poll tick: %s", err) + } + if queueLength == 0 { + q.logger.Debugf("Stopping to poll for %s ...", identifier) + close(quit) + } + } + } + }() + + return quit +} + +// QueueAdvancer is the interface passed to +// *pipelineRunQueue#StartPolling. +type QueueAdvancer interface { + // AdvanceQueue is called for each poll step. + AdvanceQueue(repository string) (int, error) +} + +// Queue represents a pipeline run Queue. Pipelines of one repository must +// not run in parallel. +type Queue struct { + TektonClient tektonClient.ClientPipelineRunInterface +} + +// needsQueueing checks if any run has either: +// - pending status set OR +// - is progressing +func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool { + for _, pr := range pipelineRuns.Items { + if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) { + return true + } + } + return false +} + +// AdvanceQueue starts the oldest pending pipeline run if there is no +// progressing pipeline run at the moment. +// It returns the queue length. +func (s *Server) AdvanceQueue(repository string) (int, error) { + s.Mutex.Lock() + defer s.Mutex.Unlock() + ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository) + if err != nil { + return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err) + } + s.Logger.Debugf("Found %d pipeline runs related to repository %s.", len(pipelineRuns.Items), repository) + if len(pipelineRuns.Items) == 0 { + return 0, nil + } + + var foundRunning bool + pendingPrs := []tekton.PipelineRun{} + for _, pr := range pipelineRuns.Items { + if pr.IsPending() { + pendingPrs = append(pendingPrs, pr) + continue + } + if pipelineRunIsProgressing(pr) { + foundRunning = true + continue + } + } + s.Logger.Debugf("Found runs for repo %s in state running=%v, pending=%d.", repository, foundRunning, len(pendingPrs)) + + if !foundRunning && len(pendingPrs) > 0 { + // update oldest pending PR + sortPipelineRunsDescending(pendingPrs) + oldestPR := pendingPrs[len(pendingPrs)-1] + pendingPrs = pendingPrs[:len(pendingPrs)-1] + s.Logger.Infof("Starting pending pipeline run %s ...", oldestPR.Name) + oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run + _, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{}) + if err != nil { + return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err) + } + } + return len(pendingPrs), nil +} + +// pipelineRunIsProgressing returns true if the PR is not done, not pending, +// not cancelled, and not timed out. +func pipelineRunIsProgressing(pr tekton.PipelineRun) bool { + return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut()) +} diff --git a/internal/manager/queue_test.go b/internal/manager/queue_test.go new file mode 100644 index 00000000..2848d0e5 --- /dev/null +++ b/internal/manager/queue_test.go @@ -0,0 +1,220 @@ +package manager + +import ( + "testing" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// fakeAdvancerDone is always done advancing the queue. +type fakeAdvancerDone struct { +} + +func (f *fakeAdvancerDone) AdvanceQueue(repository string) (int, error) { + return 0, nil +} + +func TestPollIdentifier(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{ + "a": true, + "b": false, + }, + pollInterval: time.Second, + } + f := &fakeAdvancerDone{} + p.StartPolling(f, "a", time.Second) + p.StartPolling(f, "b", time.Second) + if !p.queues["a"] { + t.Fatal("polling state for 'a' should be true") + } + if !p.queues["b"] { + t.Fatal("polling state for 'b' should be true") + } +} + +// fakeAdvancerSteps can be called a few times before it is done advancing the queue. +type fakeAdvancerSteps struct { + count int +} + +func (f *fakeAdvancerSteps) AdvanceQueue(repository string) (int, error) { + if f.count < 2 { + f.count++ + return 1, nil + } + return 0, nil +} + +func TestAdvanceQueueAndQuit(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Millisecond, + } + f := &fakeAdvancerSteps{} + done := p.StartPolling(f, "a", time.Second) + select { + case <-done: + t.Log("quit occured") + case <-time.After(5 * time.Second): + t.Fatal("quit should have occured") + } +} + +func TestAdvanceQueue(t *testing.T) { + tests := map[string]struct { + runs []*tekton.PipelineRun + wantStart string + wantPollDone bool + }{ + "none": { + runs: []*tekton.PipelineRun{}, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, none pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + }, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, one pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now()), + }, + wantStart: "two", + wantPollDone: true, + }, + "one cancelled, two pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + pendingPipelineRun(t, "three", time.Now().Add(time.Minute*-2)), + }, + wantStart: "three", + wantPollDone: false, + }, + "two pending": { + runs: []*tekton.PipelineRun{ + pendingPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "one", + wantPollDone: false, + }, + "one timed out, one pending": { + runs: []*tekton.PipelineRun{ + timedOutPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "two", + wantPollDone: true, + }, + "one running, one pending": { + runs: []*tekton.PipelineRun{ + runningPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "", + wantPollDone: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + tclient := &tektonClient.TestClient{PipelineRuns: tc.runs} + s := &Server{TektonClient: tclient} + queueLength, err := s.AdvanceQueue("a") + if err != nil { + t.Fatal(err) + } + if tc.wantStart != "" { + if len(tclient.UpdatedPipelineRuns) != 1 { + t.Fatal("should have updated one run") + } + if tclient.UpdatedPipelineRuns[0] != tc.wantStart { + t.Fatalf("should have updated run '%s'", tc.wantStart) + } + } else { + if len(tclient.UpdatedPipelineRuns) > 0 { + t.Fatal("should not have updated any run") + } + } + if (queueLength == 0) != tc.wantPollDone { + t.Fatalf("want polling to be done: %v, but queue length is: %d", tc.wantPollDone, queueLength) + } + }) + } +} + +func pendingPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusPending, + }, + } + if !pr.IsPending() { + t.Fatal("pr should be pending") + } + return pr +} + +func cancelledPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusCancelled, + }, + } + if !pr.IsCancelled() || pr.IsPending() || pr.IsDone() || pr.IsTimedOut() { + t.Fatal("pr should be cancelled") + } + return pr +} + +func timedOutPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + // pipelineTimeout := pr.Spec.Timeout + // startTime := pr.Status.StartTime + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Timeout: &metav1.Duration{Duration: time.Second}, + }, + Status: tekton.PipelineRunStatus{ + PipelineRunStatusFields: tekton.PipelineRunStatusFields{ + StartTime: &metav1.Time{Time: time.Now().Add(-2 * time.Second)}, + }, + }, + } + if !pr.IsTimedOut() || pr.IsPending() || pr.IsDone() || pr.IsCancelled() { + t.Fatal("pr should be timed out") + } + return pr +} + +func runningPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + } + if pr.IsDone() || pr.IsPending() || pr.IsTimedOut() || pr.IsCancelled() { + t.Fatal("pr should be running") + } + return pr +} diff --git a/internal/manager/server.go b/internal/manager/server.go index 329e00a3..4a5d2145 100644 --- a/internal/manager/server.go +++ b/internal/manager/server.go @@ -27,6 +27,8 @@ const ( allowedChangeRefType = "BRANCH" // letterBytes contains letters to use for random strings. letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + // defaultQueuePollInterval defines the queue pill interval in seconds. + defaultQueuePollIntervalSeconds = 30 ) // Server represents this service, and is a global. @@ -44,6 +46,7 @@ type Server struct { BitbucketClient bitbucketInterface PipelineRunPruner PipelineRunPruner Mutex sync.Mutex + RunQueue *pipelineRunQueue Logger logging.LeveledLoggerInterface } @@ -80,6 +83,9 @@ type ServerConfig struct { BitbucketClient bitbucketInterface // PipelineRunPruner is responsible to prune pipeline runs. PipelineRunPruner PipelineRunPruner + // PollInterval defines the interval between polling for pending pipelines + // in order to start one if possible. + PollInterval time.Duration // Logger is the logger to send logging messages to. Logger logging.LeveledLoggerInterface } @@ -135,6 +141,14 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { if serverConfig.Logger == nil { serverConfig.Logger = &logging.LeveledLogger{Level: logging.LevelError} } + runQueue := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Duration(defaultQueuePollIntervalSeconds) * time.Second, + logger: serverConfig.Logger, + } + if serverConfig.PollInterval > 0 { + runQueue.pollInterval = serverConfig.PollInterval + } s := &Server{ KubernetesClient: serverConfig.KubernetesClient, TektonClient: serverConfig.TektonClient, @@ -148,8 +162,32 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { TaskSuffix: serverConfig.TaskSuffix, StorageConfig: serverConfig.StorageConfig, PipelineRunPruner: serverConfig.PipelineRunPruner, + RunQueue: runQueue, Logger: serverConfig.Logger, } + + go func() { + s.Logger.Infof( + "Retrieving repositories of Bitbucket project %s to check for pending pipeline runs ...", + s.Project, + ) + repos, err := getRepoNames(s.BitbucketClient, s.Project) + if err != nil { + s.Logger.Infof( + "could not retrieve repositories from Bitbucket to poll for pending pipeline runs: %s", + err, + ) + } + s.Logger.Debugf( + "Found %d repositories for which to check for pending pipeline runs.", len(repos), + ) + for _, r := range repos { + s.Logger.Infof( + "Checking for pending pipeline runs for repository %s ...", r, + ) + s.RunQueue.StartPolling(s, r, 30*time.Second) + } + }() return s, nil } @@ -385,7 +423,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.Logger.Infof("%s %+v", requestID, pData) - _, err = createPipelineRun(s.TektonClient, r.Context(), pData) + needQueueing := needsQueueing(pipelineRuns) + _, err = createPipelineRun(s.TektonClient, r.Context(), pData, needQueueing) if err != nil { msg := "cannot create pipeline run" s.Logger.Errorf("%s %s: %s", requestID, msg, err) @@ -393,6 +432,12 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { return } + // if need queueing, then schedule poller + if needQueueing { + s.Logger.Debugf("%s Start polling mechanism to start pending pipeline when possible ...", requestID) + s.RunQueue.StartPolling(s, pData.Repository, 30*time.Second) + } + err = json.NewEncoder(w).Encode(pData) if err != nil { s.Logger.Errorf("%s cannot write body: %s", requestID, err)