Skip to content

Commit

Permalink
Enable VPA functionality (#115)
Browse files Browse the repository at this point in the history
This change wires in the VPA plugin capability.

Note: Disabled VPA plugin in `make run` because only one plugin can operate at a time. Needs some small changes in the Dispatcher to send events to only the scenario requested plugins.
juliababkina authored Sep 23, 2020
1 parent e20b19d commit fafba90
Showing 9 changed files with 239 additions and 128 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ goals = serve test build-plugin-k8s
run : build
./build/sim ./build/plugin-k8s

build : build-sim build-plugin-k8s
build : build-sim build-plugin-k8s build-plugin-k8s-vpa

build-sim :
mkdir -p build
2 changes: 2 additions & 0 deletions plugin-k8s-vpa/go.mod
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@ replace (
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.17.9
)

// Checkout https://github.com/juliababkina/autoscaler.git branch plugin
// TODO: land these custom changes upstream.
replace k8s.io/autoscaler/vertical-pod-autoscaler => ../../autoscaler/vertical-pod-autoscaler

// TODO: replace this import with github.com/skenario/plugin
41 changes: 19 additions & 22 deletions plugin-k8s-vpa/pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ type Autoscaler struct {
stats map[string]*proto.Stat
}

var containerName = "container"
var checkpointsGCInterval = flag.Duration("checkpoints-gc-interval", 3*time.Second, `How often orphaned checkpoints should be garbage collected`)

// Create a non-concurrent, non-cached informer for simulation.
@@ -207,7 +208,7 @@ func NewAutoscaler(vpaYaml string) (*Autoscaler, error) {
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsapi.ContainerMetrics{
{
Name: pod.Name,
Name: containerName,
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(cpu),
@@ -267,44 +268,40 @@ func (a *Autoscaler) VerticalRecommendation(now int64) ([]*proto.RecommendedPodR
//for _, rec := range a.vpa.Status.Recommendation.ContainerRecommendations {
//
// recommendation = append(recommendation, &proto.RecommendedPodResources{
// PodName: rec.ContainerName,
// LowerBound: rec.LowerBound.Cpu().Value(),
// UpperBound: rec.UpperBound.Cpu().Value(),
// Target: rec.Target.Cpu().Value(),
// ResourceName: v1.ResourceCPU.String(),
// })
//
// recommendation = append(recommendation, &proto.RecommendedPodResources{
// PodName: rec.ContainerName,
// LowerBound: rec.LowerBound.Memory().Value(),
// UpperBound: rec.UpperBound.Memory().Value(),
// Target: rec.Target.Memory().Value(),
// ResourceName: v1.ResourceMemory.String(),
// })
//}
//TODO remove this code after fulfilling https://github.com/pivotal/skenario/issues/100
vpa := a.recommender.GetClusterState().Vpas[model.VpaID{Namespace: "", VpaName: ""}]
vpa := a.recommender.GetClusterState().Vpas[model.VpaID{Namespace: "", VpaName: "my-app-vpa"}] //TODO(#114): fix hardcoded vpa name
if vpa.Recommendation == nil {
return recommendation, nil
}
for _, rec := range vpa.Recommendation.ContainerRecommendations {

//TODO make this part generic
recommendation = append(recommendation, &proto.RecommendedPodResources{
PodName: rec.ContainerName,
LowerBound: int32(rec.LowerBound.Cpu().Value()),
UpperBound: int32(rec.UpperBound.Cpu().Value()),
Target: int32(rec.Target.Cpu().Value()),
ResourceName: v1.ResourceCPU.String(),
})

recommendation = append(recommendation, &proto.RecommendedPodResources{
PodName: rec.ContainerName,
LowerBound: int32(rec.LowerBound.Memory().Value()),
UpperBound: int32(rec.UpperBound.Memory().Value()),
Target: int32(rec.Target.Memory().Value()),
ResourceName: v1.ResourceMemory.String(),
})
if rec.ContainerName == containerName {
recommendation = append(recommendation, &proto.RecommendedPodResources{
LowerBound: rec.LowerBound.Cpu().MilliValue(),
UpperBound: rec.UpperBound.Cpu().MilliValue(),
Target: rec.Target.Cpu().MilliValue(),
ResourceName: v1.ResourceCPU.String(),
})

recommendation = append(recommendation, &proto.RecommendedPodResources{
LowerBound: rec.LowerBound.Memory().Value(),
UpperBound: rec.UpperBound.Memory().Value(),
Target: rec.Target.Memory().Value(),
ResourceName: v1.ResourceMemory.String(),
})
}
}

return recommendation, nil
@@ -375,7 +372,7 @@ func (a *Autoscaler) listPods() ([]*v1.Pod, error) {
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: pod.Name,
Name: containerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(strconv.Itoa(int(pod.CpuRequest)) + "m"),
152 changes: 71 additions & 81 deletions plugin/pkg/skplug/proto/skplug.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions plugin/pkg/skplug/proto/skplug.proto
Original file line number Diff line number Diff line change
@@ -59,10 +59,9 @@ message VerticalRecommendationResponse{
}

message RecommendedPodResources{
string pod_name = 1;
int32 lower_bound = 2;
int32 upper_bound = 3;
int32 target = 4;
int64 lower_bound = 2;
int64 upper_bound = 3;
int64 target = 4;
string resource_name = 5;
}

65 changes: 59 additions & 6 deletions sim/pkg/model/autoscaler_ticktock.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ package model

import (
"fmt"
"github.com/josephburnett/sk-plugin/pkg/skplug/proto"
"time"

"skenario/pkg/simulator"
@@ -39,7 +40,7 @@ func (asts *autoscalerTicktockStock) Name() simulator.StockName {
}

func (asts *autoscalerTicktockStock) KindStocked() simulator.EntityKind {
return "HPAAutoscaler"
return "Autoscaler"
}

func (asts *autoscalerTicktockStock) Count() uint64 {
@@ -62,6 +63,15 @@ func (asts *autoscalerTicktockStock) Add(entity simulator.Entity) error {
currentTime := asts.env.CurrentMovementTime()

asts.cluster.RecordToAutoscaler(&currentTime)

asts.adjustHorizontally(&currentTime)
asts.adjustVertically(&currentTime)

asts.calculateCPUUtilization()

return nil
}
func (asts *autoscalerTicktockStock) adjustHorizontally(currentTime *time.Time) {
autoscalerDesired, err := asts.env.Plugin().HorizontalRecommendation(currentTime.UnixNano())
if err != nil {
panic(err)
@@ -74,7 +84,7 @@ func (asts *autoscalerTicktockStock) Add(entity simulator.Entity) error {
desiredEntity := simulator.NewEntity("Desired", "Desired")
err := asts.desiredSource.Add(desiredEntity)
if err != nil {
return err
panic(err)
}

asts.env.AddToSchedule(simulator.NewMovement(
@@ -98,13 +108,56 @@ func (asts *autoscalerTicktockStock) Add(entity simulator.Entity) error {
} else {
// do nothing
}
}

//calculate CPU utilization
asts.calculateCPUUtilization()
func (asts *autoscalerTicktockStock) adjustVertically(currentTime *time.Time) {
recommendedPodResources, err := asts.env.Plugin().VerticalRecommendation(currentTime.UnixNano())
if err != nil {
panic(err)
}

return nil
}
var cpuRecommendation *proto.RecommendedPodResources
//get cpu recommendation
for _, recommendation := range recommendedPodResources {
if recommendation.GetResourceName() == "cpu" {
cpuRecommendation = recommendation
}
}
if cpuRecommendation == nil {
return
}

//Iterate through replicas
pods := asts.cluster.ActiveStock().EntitiesInStock()
for _, pod := range pods {
//Check if we need to update this replica
resourceRequest := int64((*pod).(Replica).GetCPUCapacity())
if resourceRequest < cpuRecommendation.LowerBound || resourceRequest > cpuRecommendation.UpperBound {
//update
//We create new one with recommendations
newReplica := NewReplicaEntity(asts.env, &asts.cluster.(*clusterModel).replicaSource.(*replicaSource).failedSink).(simulator.Entity)
newReplica.(*replicaEntity).totalCPUCapacityMillisPerSecond = float64(cpuRecommendation.Target)
asts.cluster.LaunchingStock().Add(newReplica)

asts.env.AddToSchedule(simulator.NewMovement(
"create_updated_replica",
currentTime.Add(asts.cluster.Desired().(*replicasDesiredStock).config.LaunchDelay),
asts.cluster.LaunchingStock(),
asts.cluster.ActiveStock(),
&newReplica,
))

//We evict the existent replica
asts.env.AddToSchedule(simulator.NewMovement(
"evict_replica",
currentTime.Add(asts.cluster.Desired().(*replicasDesiredStock).config.LaunchDelay).Add(time.Nanosecond),
asts.cluster.ActiveStock(),
asts.cluster.TerminatingStock(),
pod,
))
}
}
}
func (asts *autoscalerTicktockStock) calculateCPUUtilization() {
countActiveReplicas := 0.0
totalCPUUtilization := 0.0 // total cpuUtilization for all active replicas in percentage
70 changes: 65 additions & 5 deletions sim/pkg/model/autoscaler_ticktock_test.go
Original file line number Diff line number Diff line change
@@ -45,20 +45,20 @@ func testAutoscalerTicktock(t *testing.T, describe spec.G, it spec.S) {

replicasConfig = ReplicasConfig{time.Second, time.Second, 100}
cluster = NewCluster(envFake, ClusterConfig{}, replicasConfig)
subject = NewAutoscalerTicktockStock(envFake, simulator.NewEntity("Autoscaler", "HPAAutoscaler"), cluster)
subject = NewAutoscalerTicktockStock(envFake, simulator.NewEntity("Autoscaler", "Autoscaler"), cluster)
rawSubject = subject.(*autoscalerTicktockStock)
})

describe("NewAutoscalerTicktockStock()", func() {
it("sets the entity", func() {
assert.Equal(t, simulator.EntityName("Autoscaler"), rawSubject.autoscalerEntity.Name())
assert.Equal(t, simulator.EntityKind("HPAAutoscaler"), rawSubject.autoscalerEntity.Kind())
assert.Equal(t, simulator.EntityKind("Autoscaler"), rawSubject.autoscalerEntity.Kind())
})
})

describe("KindStocked()", func() {
it("accepts HPA Autoscalers", func() {
assert.Equal(t, subject.KindStocked(), simulator.EntityKind("HPAAutoscaler"))
it("accepts Autoscalers", func() {
assert.Equal(t, subject.KindStocked(), simulator.EntityKind("Autoscaler"))
})
})

@@ -137,7 +137,7 @@ func testAutoscalerTicktock(t *testing.T, describe spec.G, it spec.S) {
})
})

describe("the autoscaler was able to make a recommendation", func() {
describe("the HPA autoscaler was able to make a recommendation", func() {
describe("to scale up", func() {
it.Before(func() {
envFake.ThePlugin.(*FakePluginPartition).scaleTo = 8
@@ -211,5 +211,65 @@ func testAutoscalerTicktock(t *testing.T, describe spec.G, it spec.S) {
})
})

describe("driving the VPA autoscaler", func() {
describe("the VPA autoscaler was able to make a recommendation", func() {
describe("to scale", func() {
it.Before(func() {
rawCluster := cluster.(*clusterModel)
failedSink := simulator.NewSinkStock("fake-requestsFailed", "Request")
replica := NewReplicaEntity(envFake, &failedSink)
replica.(*replicaEntity).totalCPUCapacityMillisPerSecond = 100
err := rawCluster.replicasActive.Add(replica)
assert.NoError(t, err)
envFake.ThePlugin.(*FakePluginPartition).verticalRec = []*proto.RecommendedPodResources{
{
LowerBound: 150,
UpperBound: 500,
Target: 250,
ResourceName: "cpu",
},
}
ent := subject.Remove(nil)
err = subject.Add(ent)
assert.NoError(t, err)
})
it("schedules metrics_tick for updated replica", func() {
assert.Equal(t, simulator.MovementKind("metrics_tick"), envFake.Movements[0].Kind())
})
it("schedules movements into the ReplicasActive stock", func() {
assert.Equal(t, simulator.MovementKind("create_updated_replica"), envFake.Movements[1].Kind())
})
it("schedules movements into the ReplicasTerminating stock", func() {
assert.Equal(t, simulator.MovementKind("evict_replica"), envFake.Movements[2].Kind())
})
})

describe("not to scale", func() {
it.Before(func() {
rawCluster := cluster.(*clusterModel)
failedSink := simulator.NewSinkStock("fake-requestsFailed", "Request")
replica := NewReplicaEntity(envFake, &failedSink)
replica.(*replicaEntity).totalCPUCapacityMillisPerSecond = 100
err := rawCluster.replicasActive.Add(replica)
assert.NoError(t, err)
envFake.ThePlugin.(*FakePluginPartition).verticalRec = []*proto.RecommendedPodResources{
{
LowerBound: 50,
UpperBound: 500,
Target: 250,
ResourceName: "cpu",
},
}
ent := subject.Remove(nil)
err = subject.Add(ent)
assert.NoError(t, err)
})
it("schedules no movements apart from metrics_tick", func() {
assert.Len(t, envFake.Movements, 1)
})
})
})
})

})
}
16 changes: 13 additions & 3 deletions sim/pkg/model/cluster.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,9 @@ type ClusterModel interface {
CurrentActive() uint64
RecordToAutoscaler(atTime *time.Time)
RoutingStock() RequestsRoutingStock
ActiveStock() simulator.ThroughStock
ActiveStock() ReplicasActiveStock
TerminatingStock() ReplicasTerminatingStock
LaunchingStock() simulator.ThroughStock
}

type clusterModel struct {
@@ -46,7 +48,7 @@ type clusterModel struct {
replicasDesired ReplicasDesiredStock
replicaSource ReplicaSource
replicasLaunching simulator.ThroughStock
replicasActive simulator.ThroughStock
replicasActive ReplicasActiveStock
replicasTerminating ReplicasTerminatingStock
replicasTerminated simulator.SinkStock
requestsInRouting simulator.ThroughStock
@@ -89,10 +91,18 @@ func (cm *clusterModel) RoutingStock() RequestsRoutingStock {
return cm.requestsInRouting
}

func (cm *clusterModel) ActiveStock() simulator.ThroughStock {
func (cm *clusterModel) ActiveStock() ReplicasActiveStock {
return cm.replicasActive
}

func (cm *clusterModel) TerminatingStock() ReplicasTerminatingStock {
return cm.replicasTerminating
}

func (cm *clusterModel) LaunchingStock() simulator.ThroughStock {
return cm.replicasLaunching
}

func NewCluster(env simulator.Environment, config ClusterConfig, replicasConfig ReplicasConfig) ClusterModel {
replicasActive := NewReplicasActiveStock(env)
requestsFailed := simulator.NewSinkStock("RequestsFailed", "Request")
12 changes: 6 additions & 6 deletions sim/pkg/model/fakes.go
Original file line number Diff line number Diff line change
@@ -131,10 +131,11 @@ func NewFakeReplica() *FakeReplica {
}

type FakePluginPartition struct {
scaleTimes []int64
stats []*proto.Stat
scaleTo int32
plugin skplug.Plugin
scaleTimes []int64
stats []*proto.Stat
scaleTo int32
plugin skplug.Plugin
verticalRec []*proto.RecommendedPodResources
}

func (fp *FakePluginPartition) Event(time int64, typ proto.EventType, object skplug.Object) error {
@@ -152,8 +153,7 @@ func (fp *FakePluginPartition) HorizontalRecommendation(time int64) (rec int32,
}

func (fp *FakePluginPartition) VerticalRecommendation(time int64) (rec []*proto.RecommendedPodResources, err error) {
//TODO implement it after injecting vertical scaling in Skenario
panic("unimplemented")
return fp.verticalRec, nil
}

func NewFakePluginPartition() *FakePluginPartition {

0 comments on commit fafba90

Please sign in to comment.