Skip to content

Commit

Permalink
add support for pipelines controller statefulset ordinals
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhelil authored and tekton-robot committed Oct 15, 2024
1 parent 80c8681 commit efd8c40
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/TektonPipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
threads-per-controller: 2
kube-api-qps: 5.0
kube-api-burst: 10
statefulset-ordinals: false
options:
disabled: false
configMaps: {}
Expand Down Expand Up @@ -263,6 +264,7 @@ spec:
threads-per-controller: 2
kube-api-qps: 5.0
kube-api-burst: 10
statefulset-ordinals: false
```
These fields are optional and there is no default values. If user passes them, operator will include most of fields into the deployment `tekton-pipelines-controller` under the container `tekton-pipelines-controller` as arguments(duplicate name? No, container and deployment has the same name), otherwise pipelines controller's default values will be considered. and `buckets` field is updated into `config-leader-election` config-map under the namespace `tekton-pipelines`.

Expand All @@ -275,6 +277,11 @@ A high level descriptions are given here. To get the detailed information please
* `threads-per-controller` - is the number of threads(aka worker) to use when processing the pipelines controller's workqueue, default value in pipelines controller is `2`
* `kube-api-qps` - QPS indicates the maximum QPS to the cluster master from the REST client, default value in pipeline controller is `5.0`
* `kube-api-burst` - maximum burst for throttle, default value in pipeline controller is `10`
* `statefulset-ordinals` - enables StatefulSet Ordinals mode for the Tekton Pipelines Controller. When set to true, the Pipelines Controller is deployed as a StatefulSet, allowing for multiple replicas to be configured with a load-balancing mode. This ensures that the load is evenly distributed across replicas, and the number of buckets is enforced to match the number of replicas.
Moreover, There are two mechanisms available for scaling for scaling Pipelines Controller horizontally:
- Using leader election, which allows for failover, but can result in hot-spotting.
- Using StatefulSet ordinals, which doesn't allow for failover, but guarantees load is evenly spread across replicas.


> #### Note:
> * `kube-api-qps` and `kube-api-burst` will be multiplied by 2 in pipelines controller. To get the detailed information visit [Performance Configuration](https://tekton.dev/docs/pipelines/tekton-controller-performance-configuration/) guide
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ func (p *Pipeline) setDefaults() {
p.EnableGitResolver = ptr.Bool(true)
}

// Statefulset Ordinals
// if StatefulSet Ordinals mode, buckets should be equal to replicas
if p.Performance.StatefulsetOrdinals != nil && *p.Performance.StatefulsetOrdinals {
if p.Performance.Replicas != nil && *p.Performance.Replicas > 1 {
replicas := uint(*p.Performance.Replicas)
p.Performance.Buckets = &replicas
}
}

// run platform specific defaulting
if IsOpenShiftPlatform() {
p.openshiftDefaulting()
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ type PipelinePerformanceProperties struct {
// +optional
PipelinePerformanceLeaderElectionConfig `json:",inline"`
// +optional
PipelinePerformanceStatefulsetOrdinalsConfig `json:",inline"`
// +optional
PipelineDeploymentPerformanceArgs `json:",inline"`
// +optional
Replicas *int32 `json:"replicas,omitempty"`
Expand All @@ -201,6 +203,12 @@ type PipelinePerformanceLeaderElectionConfig struct {
Buckets *uint `json:"buckets,omitempty"`
}

// allow to configure pipelines controller ha mode to statefulset ordinals
type PipelinePerformanceStatefulsetOrdinalsConfig struct {
//if is true, enable StatefulsetOrdinals mode
StatefulsetOrdinals *bool `json:"statefulset-ordinals,omitempty"`
}

// performance configurations to tune the performance of the pipeline controller
// these properties will be added/updated as arguments in pipeline controller deployment
// https://tekton.dev/docs/pipelines/tekton-controller-performance-configuration/
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,15 @@ func (prof *PipelinePerformanceProperties) validate(path string) *apis.FieldErro
}
}

// check for StatefulsetOrdinals and Replicas
if prof.StatefulsetOrdinals != nil && *prof.StatefulsetOrdinals {
if prof.Replicas != nil {
replicas := uint(*prof.Replicas)
if *prof.Buckets != replicas {
errs = errs.Also(apis.ErrInvalidValue(*prof.Replicas, fmt.Sprintf("%s.replicas", path), "spec.performance.replicas must equal spec.performance.buckets for statefulset ordinals"))
}
}
}

return errs
}
29 changes: 29 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ func TestTektonPipelinePerformancePropertiesValidate(t *testing.T) {
return &value
}

// return pointer value for replicas
getReplicas := func(value int32) *int32 {
return &value
}

statefulsetOrdinals := true

// validate buckets minimum range
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
Expand Down Expand Up @@ -310,4 +317,26 @@ func TestTektonPipelinePerformancePropertiesValidate(t *testing.T) {
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(10)
errs = tp.Validate(context.TODO())
assert.Equal(t, "", errs.Error())

// validate buckets is equal to replicas when StatefulsetOrdinals is true
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
bucketValue := uint(5)
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(bucketValue)
replicaValue := int32(5)
tp.Spec.PipelineProperties.Performance.Replicas = getReplicas(replicaValue)
tp.Spec.PipelineProperties.Performance.StatefulsetOrdinals = &statefulsetOrdinals
errs = tp.Validate(context.TODO())
assert.Equal(t, "", errs.Error())

// validate error when buckets is not equal to replica
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
tp.Spec.PipelineProperties.Performance.StatefulsetOrdinals = &statefulsetOrdinals
bucketValue = uint(5)
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(bucketValue)
tp.Spec.PipelineProperties.Performance.Replicas = getReplicas(3)
errs = tp.Validate(context.TODO())
expectedErrorMessage := "invalid value: 3: spec.performance.replicas\nspec.performance.replicas must equal spec.performance.buckets for statefulset ordinals"
assert.Equal(t, expectedErrorMessage, errs.Error())
}
22 changes: 22 additions & 0 deletions pkg/apis/operator/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions pkg/reconciler/common/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -1013,3 +1014,87 @@ func AddSecretData(data map[string][]byte, annotations map[string]string) mf.Tra
return nil
}
}

// ConvertDeploymentToStatefulSet converts a Deployment to a StatefulSet with given parameters
func ConvertDeploymentToStatefulSet(controllerName, serviceName string) mf.Transformer {
return func(u *unstructured.Unstructured) error {
if u.GetKind() != "Deployment" || u.GetName() != controllerName {
return nil
}

d := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, d)
if err != nil {
return err
}

ss := &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version,
},
ObjectMeta: d.ObjectMeta,
Spec: appsv1.StatefulSetSpec{
Selector: d.Spec.Selector,
ServiceName: serviceName,
Template: d.Spec.Template,
Replicas: d.Spec.Replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
},
},
}

unstrObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ss)
if err != nil {
return err
}

u.SetUnstructuredContent(unstrObj)

return nil
}
}

// AddStatefulEnvVars adds environment variables to the statefulset based on given parameters
func AddStatefulEnvVars(controllerName, serviceName, statefulServiceEnvVar, controllerOrdinalEnvVar string) mf.Transformer {
return func(u *unstructured.Unstructured) error {
if u.GetKind() != "StatefulSet" || u.GetName() != controllerName {
return nil
}

ss := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, ss)
if err != nil {
return err
}

newEnvVars := []corev1.EnvVar{
{
Name: statefulServiceEnvVar,
Value: serviceName,
},
{
Name: controllerOrdinalEnvVar,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
}

if len(ss.Spec.Template.Spec.Containers) > 0 {
ss.Spec.Template.Spec.Containers[0].Env = append(ss.Spec.Template.Spec.Containers[0].Env, newEnvVars...)
}

unstrObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ss)
if err != nil {
return err
}

u.SetUnstructuredContent(unstrObj)

return nil
}
}
4 changes: 3 additions & 1 deletion pkg/reconciler/kubernetes/tektoninstallerset/client/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func verifyMainInstallerSets(iSets []v1alpha1.TektonInstallerSet) error {
static = true
}
if strings.Contains(iSets[0].GetName(), InstallerSubTypeDeployment) ||
strings.Contains(iSets[1].GetName(), InstallerSubTypeDeployment) {
strings.Contains(iSets[1].GetName(), InstallerSubTypeDeployment) ||
strings.Contains(iSets[0].GetName(), InstallerSubTypeStatefulset) ||
strings.Contains(iSets[1].GetName(), InstallerSubTypeStatefulset) {
deployment = true
}
if !(static && deployment) {
Expand Down
37 changes: 36 additions & 1 deletion pkg/reconciler/kubernetes/tektoninstallerset/client/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (i *InstallerSetClient) CleanupMainSet(ctx context.Context) error {

// now delete all deployment installerSet
for _, is := range list.Items {
if strings.Contains(is.GetName(), InstallerSubTypeDeployment) {
if strings.Contains(is.GetName(), InstallerSubTypeDeployment) ||
strings.Contains(is.GetName(), InstallerSubTypeStatefulset) {
logger.Debugf("deleting main-deployment installer set: %s", is.GetName())
err = i.clientSet.Delete(ctx, is.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePropagationPolicy,
Expand Down Expand Up @@ -125,3 +126,37 @@ func (i *InstallerSetClient) cleanup(ctx context.Context, isType string) error {
}
return nil
}

func (i *InstallerSetClient) CleanupSubTypeDeployment(ctx context.Context) error {
return i.cleanupSubType(ctx, InstallerTypeMain, InstallerSubTypeDeployment)
}

func (i *InstallerSetClient) CleanupSubTypeStatefulset(ctx context.Context) error {
return i.cleanupSubType(ctx, InstallerTypeMain, InstallerSubTypeStatefulset)
}

func (i *InstallerSetClient) cleanupSubType(ctx context.Context, isType string, isSubType string) error {
logger := logging.FromContext(ctx).With("kind", i.resourceKind, "type", isType)

list, err := i.clientSet.List(ctx, metav1.ListOptions{LabelSelector: i.getSetLabels(isType)})
if err != nil {
return err
}

if len(list.Items) != 1 {
logger.Errorf("found more than 1 installerSet for %s something fishy, cleaning up all", isType)
}

for _, is := range list.Items {
if strings.Contains(is.GetName(), isSubType) {
logger.Debugf("deleting %s installer set: %s", isType, is.GetName())
err = i.clientSet.Delete(ctx, is.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePropagationPolicy,
})
if err != nil {
return fmt.Errorf("failed to delete %s set: %s", isType, is.GetName())
}
}
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/reconciler/kubernetes/tektoninstallerset/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
)

const (
InstallerSubTypeStatic = "static"
InstallerSubTypeDeployment = "deployment"
InstallerSubTypeStatic = "static"
InstallerSubTypeDeployment = "deployment"
InstallerSubTypeStatefulset = "statefulset"

InstallerTypeMain = "main"
InstallerTypePre = "pre"
Expand Down
21 changes: 21 additions & 0 deletions pkg/reconciler/kubernetes/tektoninstallerset/client/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (i *InstallerSetClient) create(ctx context.Context, comp v1alpha1.TektonCom
func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.TektonComponent, manifest *mf.Manifest) ([]v1alpha1.TektonInstallerSet, error) {
staticManifest := manifest.Filter(mf.Not(mf.ByKind("Deployment")), mf.Not(mf.ByKind("Service")))
deploymentManifest := manifest.Filter(mf.Any(mf.ByKind("Deployment"), mf.ByKind("Service")))
statefulSetManifest := manifest.Filter(mf.Any(mf.ByKind("StatefulSet"), mf.ByKind("Service")))

kind := strings.ToLower(strings.TrimPrefix(i.resourceKind, "Tekton"))
staticName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeStatic)
Expand All @@ -78,6 +79,7 @@ func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.Tek
}

deployName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeDeployment)

deploymentIS, err := i.makeInstallerSet(ctx, comp, &deploymentManifest, deployName, InstallerTypeMain, nil)
if err != nil {
return nil, err
Expand All @@ -87,6 +89,25 @@ func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.Tek
if err != nil {
return nil, err
}

statefulSet := false
if pipeline, ok := comp.(*v1alpha1.TektonPipeline); ok {
statefulSet = pipeline.Spec.Performance.StatefulsetOrdinals != nil && *pipeline.Spec.Performance.StatefulsetOrdinals
}
if statefulSet {
stsName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeStatefulset)
stsIS, err := i.makeInstallerSet(ctx, comp, &statefulSetManifest, stsName, InstallerTypeMain, nil)
if err != nil {
return nil, err
}

stsIS, err = i.clientSet.Create(ctx, stsIS, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return []v1alpha1.TektonInstallerSet{*staticIS, *deploymentIS, *stsIS}, nil
}

return []v1alpha1.TektonInstallerSet{*staticIS, *deploymentIS}, nil
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/reconciler/kubernetes/tektonpipeline/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, tp *v1alpha1.TektonPipel
return err
}

// Pipeline controller is deployed as statefulset, ensure deployment installerset is deleted
if tp.Spec.Performance.StatefulsetOrdinals != nil && *tp.Spec.Performance.StatefulsetOrdinals {
if err := r.installerSetClient.CleanupSubTypeDeployment(ctx); err != nil {
logger.Error("failed to delete main deployment installer set: %v", err)
return err
}
} else {
// Pipeline controller is deployed as deployment, ensure statefulset installerset is deleted
if err := r.installerSetClient.CleanupSubTypeStatefulset(ctx); err != nil {
logger.Error("failed to delete main statefulset installer set: %v", err)
return err
}
}

if err := r.extension.PreReconcile(ctx, tp); err != nil {
msg := fmt.Sprintf("PreReconciliation failed: %s", err.Error())
logger.Error(msg)
Expand Down
Loading

0 comments on commit efd8c40

Please sign in to comment.