-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Pipeline runs belonging to one repository now run sequentially. If a pipeline run cannot start immediately, it is created as "pending", and a process is started to periodically check if it can start. Since the pipeline manager service may be restarted, it check on boot if there are any pending runs for the repositories under its control and starts the periodic check for those. We can improve on the design by adding a signal in the finish task of a pipeline run that the run will soon finish, reducing the up to 30s wait time for the next run. Closes #394.
- Loading branch information
1 parent
8e69e9e
commit 359ed15
Showing
7 changed files
with
494 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package manager | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
tektonClient "github.com/opendevstack/pipeline/internal/tekton" | ||
"github.com/opendevstack/pipeline/pkg/config" | ||
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" | ||
) | ||
|
||
func TestCreatePipelineRun(t *testing.T) { | ||
tc := &tektonClient.TestClient{} | ||
ctxt := context.TODO() | ||
pData := PipelineData{ | ||
Name: "foo", | ||
Repository: "repo", | ||
GitRef: "branch", | ||
Stage: config.DevStage, | ||
PVC: "pvc", | ||
} | ||
pr, err := createPipelineRun(tc, ctxt, pData, false) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if pr.GenerateName != "foo-" { | ||
t.Fatalf("Expected generated name to be foo-, got: %s", pr.GenerateName) | ||
} | ||
if pr.Spec.PipelineRef.Name != "foo" { | ||
t.Fatalf("Expected pipeline ref to be foo, got: %s", pr.Spec.PipelineRef.Name) | ||
} | ||
if pr.Spec.Status != "" { | ||
t.Fatalf("Expected status to be empty, got: %s", pr.Spec.Status) | ||
} | ||
if pr.Labels[repositoryLabel] != pData.Repository { | ||
t.Fatalf("Expected label %s to be %s, got: %s", repositoryLabel, pData.Repository, pr.Labels[repositoryLabel]) | ||
} | ||
if pr.Labels[gitRefLabel] != pData.GitRef { | ||
t.Fatalf("Expected label %s to be %s, got: %s", gitRefLabel, pData.GitRef, pr.Labels[gitRefLabel]) | ||
} | ||
if pr.Labels[stageLabel] != pData.Stage { | ||
t.Fatalf("Expected label %s to be %s, got: %s", stageLabel, pData.Stage, pr.Labels[stageLabel]) | ||
} | ||
workspaceCfg := pr.Spec.Workspaces[0] | ||
if workspaceCfg.Name != sharedWorkspaceName { | ||
t.Fatalf("Expected generated name to be %s, got: %s", sharedWorkspaceName, workspaceCfg.Name) | ||
} | ||
if workspaceCfg.PersistentVolumeClaim.ClaimName != "pvc" { | ||
t.Fatalf("Expected generated name to be pvc, got: %s", workspaceCfg.Name) | ||
} | ||
if len(tc.CreatedPipelineRuns) != 1 { | ||
t.Fatal("No pipeline run created") | ||
} | ||
pr, err = createPipelineRun(tc, ctxt, pData, true) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if pr.Spec.Status != tekton.PipelineRunSpecStatusPending { | ||
t.Fatalf("Expected status to be pending, got: %s", pr.Spec.Status) | ||
} | ||
if len(tc.CreatedPipelineRuns) != 2 { | ||
t.Fatal("No pipeline run created") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package manager | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"time" | ||
|
||
tektonClient "github.com/opendevstack/pipeline/internal/tekton" | ||
"github.com/opendevstack/pipeline/pkg/logging" | ||
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
// pipelineRunQueue manages multiple queues. These queues | ||
// can be polled in vertain intervals. | ||
type pipelineRunQueue struct { | ||
queues map[string]bool | ||
pollInterval time.Duration | ||
// logger is the logger to send logging messages to. | ||
logger logging.LeveledLoggerInterface | ||
} | ||
|
||
// StartPolling periodically checks status for given identifier. | ||
// The time until the first time is not more than maxInitialWait. | ||
func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool { | ||
quit := make(chan bool) | ||
if q.queues[identifier] { | ||
close(quit) | ||
return quit | ||
} | ||
q.queues[identifier] = true | ||
|
||
maxInitialWaitSeconds := int(maxInitialWait.Seconds()) | ||
var ticker *time.Ticker | ||
if maxInitialWaitSeconds > 1 { | ||
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1 | ||
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second) | ||
} else { | ||
ticker = time.NewTicker(time.Second) | ||
} | ||
go func() { | ||
for { | ||
select { | ||
case <-quit: | ||
q.queues[identifier] = false | ||
ticker.Stop() | ||
return | ||
case <-ticker.C: | ||
ticker.Stop() | ||
ticker = time.NewTicker(q.pollInterval) | ||
q.logger.Debugf("Advancing queue for %s ...", identifier) | ||
queueLength, err := pt.AdvanceQueue(identifier) | ||
if err != nil { | ||
q.logger.Warnf("error during poll tick: %s", err) | ||
} | ||
if queueLength == 0 { | ||
q.logger.Debugf("Stopping to poll for %s ...", identifier) | ||
close(quit) | ||
} | ||
} | ||
} | ||
}() | ||
|
||
return quit | ||
} | ||
|
||
// QueueAdvancer is the interface passed to | ||
// *pipelineRunQueue#StartPolling. | ||
type QueueAdvancer interface { | ||
// AdvanceQueue is called for each poll step. | ||
AdvanceQueue(repository string) (int, error) | ||
} | ||
|
||
// Queue represents a pipeline run Queue. Pipelines of one repository must | ||
// not run in parallel. | ||
type Queue struct { | ||
TektonClient tektonClient.ClientPipelineRunInterface | ||
} | ||
|
||
// needsQueueing checks if any run has either: | ||
// - pending status set OR | ||
// - is progressing | ||
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool { | ||
for _, pr := range pipelineRuns.Items { | ||
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// AdvanceQueue starts the oldest pending pipeline run if there is no | ||
// progressing pipeline run at the moment. | ||
// It returns the queue length. | ||
func (s *Server) AdvanceQueue(repository string) (int, error) { | ||
s.Mutex.Lock() | ||
defer s.Mutex.Unlock() | ||
ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | ||
defer cancel() | ||
pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository) | ||
if err != nil { | ||
return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err) | ||
} | ||
s.Logger.Debugf("Found %d pipeline runs related to repository %s.", len(pipelineRuns.Items), repository) | ||
if len(pipelineRuns.Items) == 0 { | ||
return 0, nil | ||
} | ||
|
||
var foundRunning bool | ||
pendingPrs := []tekton.PipelineRun{} | ||
for _, pr := range pipelineRuns.Items { | ||
if pr.IsPending() { | ||
pendingPrs = append(pendingPrs, pr) | ||
continue | ||
} | ||
if pipelineRunIsProgressing(pr) { | ||
foundRunning = true | ||
continue | ||
} | ||
} | ||
s.Logger.Debugf("Found runs for repo %s in state running=%v, pending=%d.", repository, foundRunning, len(pendingPrs)) | ||
|
||
if !foundRunning && len(pendingPrs) > 0 { | ||
// update oldest pending PR | ||
sortPipelineRunsDescending(pendingPrs) | ||
oldestPR := pendingPrs[len(pendingPrs)-1] | ||
pendingPrs = pendingPrs[:len(pendingPrs)-1] | ||
s.Logger.Infof("Starting pending pipeline run %s ...", oldestPR.Name) | ||
oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run | ||
_, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{}) | ||
if err != nil { | ||
return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err) | ||
} | ||
} | ||
return len(pendingPrs), nil | ||
} | ||
|
||
// pipelineRunIsProgressing returns true if the PR is not done, not pending, | ||
// not cancelled, and not timed out. | ||
func pipelineRunIsProgressing(pr tekton.PipelineRun) bool { | ||
return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut()) | ||
} |
Oops, something went wrong.