Skip to content

Commit

Permalink
Suggesting patches on restructuring the code (#50)
Browse files Browse the repository at this point in the history
* integrating GO tablewriter for output formatting

* adding vendor files for new packages

* refactoring code for archiving

* refactored a k8s file a bit. To be continued

* fix a bad rebase

* adding driver field to archive

---------

Co-authored-by: Vishnu Challa <[email protected]>
  • Loading branch information
vishnuchalla and Vishnu Challa authored Apr 6, 2023
1 parent 8b1f0ab commit 6885877
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 191 deletions.
206 changes: 81 additions & 125 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,68 @@ func IndexDocs(client *opensearch.Client, docs []Doc) error {
return nil
}

// Common csv header fields.
func commonCsvHeaderFields() []string {
return []string{
"Driver",
"Profile",
"Same node",
"Host Network",
"Service",
"Duration",
"Parallelism",
"# of Samples",
"Message Size",
}
}

// Common csv data fields.
func commonCsvDataFeilds(row result.Data) []string{
return []string{
fmt.Sprint(row.Driver),
fmt.Sprint(row.Profile),
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
}
}

// Writes all the mertic feilds to the archive.
func writeArchive(cpuarchive, podarchive *csv.Writer, role string, row result.Data, podResults []metrics.PodCPU) error {
roleFieldData := []string{role}
for _, pod := range podResults {
if err := podarchive.Write(append(append(roleFieldData,
commonCsvDataFeilds(row)...),
fmt.Sprintf("%s", pod.Name),
fmt.Sprintf("%f", pod.Value),
)); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
}

cpu := row.ClientMetrics
if role == "Server" {
cpu = row.ServerMetrics
}
if err := cpuarchive.Write(append(append(roleFieldData,
commonCsvDataFeilds(row)...),
fmt.Sprintf("%f", cpu.Idle),
fmt.Sprintf("%f", cpu.User),
fmt.Sprintf("%f", cpu.System),
fmt.Sprintf("%f", cpu.Iowait),
fmt.Sprintf("%f", cpu.Steal),
fmt.Sprintf("%f", cpu.Softirq),
fmt.Sprintf("%f", cpu.Irq),
)); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
return nil
}

// WritePromCSVResult writes the prom data in CSV format
func WritePromCSVResult(r result.ScenarioResults) error {
d := time.Now().Unix()
Expand All @@ -136,125 +198,35 @@ func WritePromCSVResult(r result.ScenarioResults) error {
defer cpuarchive.Flush()
podarchive := csv.NewWriter(podfp)
defer podarchive.Flush()
cpudata := []string{
"Role",
"Driver",
"Profile",
"Same node",
"Host Network",
"Service",
"Duration",
"Parallelism",
"# of Samples",
"Message Size",
roleField := []string{"Role"}
cpudata := append(append(roleField,
commonCsvHeaderFields()...),
"Idle CPU",
"User CPU",
"System CPU",
"IOWait CPU",
"Steal CPU",
"SoftIRQ CPU",
"IRQ CPU"}
poddata := []string{
"Role",
"Driver",
"Profile",
"Same node",
"Host Network",
"Service",
"Duration",
"Parallelism",
"# of Samples",
"Message Size",
"IRQ CPU",
)
poddata := append(append(roleField,
commonCsvHeaderFields()...),
"Pod Name",
"Utilization"}
"Utilization",
)
if err := cpuarchive.Write(cpudata); err != nil {
return fmt.Errorf("Failed to write cpu archive to file")
}
if err := podarchive.Write(poddata); err != nil {
return fmt.Errorf("Failed to write pod archive to file")
}
for _, row := range r.Results {
ccpu := row.ClientMetrics
if err := cpuarchive.Write([]string{
"Client",
fmt.Sprint(row.Driver),
fmt.Sprint(row.Profile),
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
fmt.Sprintf("%f", ccpu.Idle),
fmt.Sprintf("%f", ccpu.User),
fmt.Sprintf("%f", ccpu.System),
fmt.Sprintf("%f", ccpu.Iowait),
fmt.Sprintf("%f", ccpu.Steal),
fmt.Sprintf("%f", ccpu.Softirq),
fmt.Sprintf("%f", ccpu.Irq),
}); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
scpu := row.ServerMetrics
if err := cpuarchive.Write([]string{
"Server",
fmt.Sprint(row.Driver),
fmt.Sprint(row.Profile),
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
fmt.Sprintf("%f", scpu.Idle),
fmt.Sprintf("%f", scpu.User),
fmt.Sprintf("%f", scpu.System),
fmt.Sprintf("%f", scpu.Iowait),
fmt.Sprintf("%f", scpu.Steal),
fmt.Sprintf("%f", scpu.Softirq),
fmt.Sprintf("%f", scpu.Irq),
}); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
for _, pod := range row.ClientPodCPU.Results {
if err := podarchive.Write([]string{
"Server",
fmt.Sprint(row.Driver),
fmt.Sprint(row.Profile),
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
fmt.Sprintf("%s", pod.Name),
fmt.Sprintf("%f", pod.Value),
}); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
if err := writeArchive(cpuarchive, podarchive, "Client", row, row.ClientPodCPU.Results); err != nil {
return err
}
for _, pod := range row.ServerPodCPU.Results {
if err := podarchive.Write([]string{
"Server",
fmt.Sprint(row.Driver),
fmt.Sprint(row.Profile),
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
fmt.Sprintf("%s", pod.Name),
fmt.Sprintf("%f", pod.Value),
}); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
if err := writeArchive(cpuarchive, podarchive, "Server", row, row.ServerPodCPU.Results); err != nil {
return err
}

}
return nil
}
Expand All @@ -270,41 +242,25 @@ func WriteCSVResult(r result.ScenarioResults) error {
archive := csv.NewWriter(fp)
defer archive.Flush()

data := []string{
"Driver",
"Profile",
"Same node",
"Host Network",
"Service",
"Duration",
"Parallelism",
"# of Samples",
"Message Size",
data := append(commonCsvHeaderFields(),
"Avg Throughput",
"Throughput Metric",
"99%tile Observed Latency",
"Latency Metric"}
"Latency Metric",
)

if err := archive.Write(data); err != nil {
return fmt.Errorf("Failed to write result archive to file")
}
for _, row := range r.Results {
avg, _ := result.Average(row.ThroughputSummary)
lavg, _ := result.Average(row.LatencySummary)
data := []string{
row.Driver,
row.Profile,
fmt.Sprint(row.SameNode),
fmt.Sprint(row.HostNetwork),
fmt.Sprint(row.Service),
strconv.Itoa(row.Duration),
strconv.Itoa(row.Parallelism),
strconv.Itoa(row.Samples),
strconv.Itoa(row.MessageSize),
data := append(commonCsvDataFeilds(row),
fmt.Sprintf("%f", avg),
row.Metric,
fmt.Sprint(lavg),
"usec"}
"usec",
)
if err := archive.Write(data); err != nil {
return fmt.Errorf("Failed to write archive to file")
}
Expand Down
93 changes: 27 additions & 66 deletions pkg/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
ncount := len(nodes.Items)
log.Debugf("Number of nodes with role worker: %d", ncount)

zoneNodeSelectorExpression := []apiv1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: apiv1.NodeSelectorTerm{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}},
},
},
},
}
if s.NodeLocal {
// Create Netperf client on the same node as the server.
cdp := DeploymentParams{
Expand All @@ -89,16 +99,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
}
if z != "" {
cdp.NodeAffinity = apiv1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: apiv1.NodeSelectorTerm{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}},
},
},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression,
}
}
s.Client, err = deployDeployment(client, cdp)
Expand Down Expand Up @@ -152,43 +153,28 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Command: []string{"/bin/bash", "-c", "sleep 10000000"},
Port: NetperfServerCtlPort,
}
workerNodeSelectorExpression := &apiv1.NodeSelector{
NodeSelectorTerms: []apiv1.NodeSelectorTerm{
{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}},
},
},
},
}
if z != "" {
if num_nodes > 1 {
cdpAcross.NodeAffinity = apiv1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: apiv1.NodeSelectorTerm{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}},
},
},
},
},
RequiredDuringSchedulingIgnoredDuringExecution: &apiv1.NodeSelector{
NodeSelectorTerms: []apiv1.NodeSelectorTerm{
{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}},
},
},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression,
RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression,
}
} else {
cdpAcross.NodeAffinity = apiv1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &apiv1.NodeSelector{
NodeSelectorTerms: []apiv1.NodeSelectorTerm{
{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}},
},
},
},
},
RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression,
}
}
}

if ncount > 1 {
if s.HostNetwork {
s.ClientHost, err = deployDeployment(client, cdpHostAcross)
Expand Down Expand Up @@ -225,37 +211,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
affinity := apiv1.NodeAffinity{}
if num_nodes > 1 {
affinity = apiv1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: apiv1.NodeSelectorTerm{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}},
},
},
},
},
RequiredDuringSchedulingIgnoredDuringExecution: &apiv1.NodeSelector{
NodeSelectorTerms: []apiv1.NodeSelectorTerm{
{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}},
},
},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression,
RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression,
}
} else {
affinity = apiv1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &apiv1.NodeSelector{
NodeSelectorTerms: []apiv1.NodeSelectorTerm{
{
MatchExpressions: []apiv1.NodeSelectorRequirement{
{Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}},
},
},
},
},
RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression,
}
}
sdp.NodeAffinity = affinity
Expand Down

0 comments on commit 6885877

Please sign in to comment.