Skip to content

Commit

Permalink
chore: added series aggregation for group by with value type panel (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aniketio-ctrl authored Jan 14, 2025
1 parent dbe78e5 commit 5708079
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 29 deletions.
30 changes: 30 additions & 0 deletions pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package helpers

import (
"fmt"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

func AddSecondaryAggregation(seriesAggregator v3.SecondaryAggregation, query string) string {
queryImpl := "SELECT %s as aggregated_value, ts" +
" FROM (%s)" +
" GROUP BY ts" +
" ORDER BY ts"

var op string
switch seriesAggregator {
case v3.SecondaryAggregationAvg:
op = "avg(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationSum:
op = "sum(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationMin:
op = "min(value)"
query = fmt.Sprintf(queryImpl, op, query)
case v3.SecondaryAggregationMax:
op = "max(value)"
query = fmt.Sprintf(queryImpl, op, query)
}
return query
}
4 changes: 4 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
mq.SpaceAggregation = percentileOperator
}

if panelType == v3.PanelTypeValue && len(mq.GroupBy) > 0 {
query = helpers.AddSecondaryAggregation(mq.SecondaryAggregation, query)
}

return query, nil
}

Expand Down
146 changes: 146 additions & 0 deletions pkg/query-service/app/metrics/v4/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,149 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
})
}
}

func TestPrepareMetricQueryValueTypePanelWithGroupBY(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = cumulative, panel = value, series agg = max group by state",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Delta,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SecondaryAggregation: v3.SecondaryAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Delta' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC) GROUP BY ts ORDER BY ts",
},
{
name: "test temporality = cumulative, panel = value, series agg = max group by state, host_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorMin,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Cumulative,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationAvg,
SecondaryAggregation: v3.SecondaryAggregationMax,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
{
Key: "host_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorSum,
Having: []v3.Having{
{
ColumnName: "AVG(system_memory_usage)",
Operator: v3.HavingOperatorGreaterThan,
Value: 5,
},
},
},
expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, host_name, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Cumulative' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, host_name, ts ORDER BY state desc, host_name ASC, ts ASC) GROUP BY ts ORDER BY ts",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1735891811000 - Friday, 3 January 2025 13:40:11 GMT+05:30
// 1735894811000 - Friday, 3 January 2025 14:30:11 GMT+05:30
query, err := PrepareMetricQuery(1735891811000, 1735894811000, v3.QueryTypeBuilder, v3.PanelTypeValue, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
3 changes: 3 additions & 0 deletions pkg/query-service/app/queryBuilder/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
for idx, groupBy := range query.GroupBy {
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
}
if params.CompositeQuery.PanelType == v3.PanelTypeValue {
parts = append(parts, fmt.Sprintf("secondaryAggregation=%s", query.SecondaryAggregation))
}
}

if len(query.Having) > 0 {
Expand Down
17 changes: 9 additions & 8 deletions pkg/query-service/app/queryBuilder/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,13 +1300,14 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
Temporality: v3.Delta,
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
SecondaryAggregation: v3.SecondaryAggregationMax,
Expression: "A",
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
Expand All @@ -1333,7 +1334,7 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) {
},
},
expectedCacheKeys: map[string]string{
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&having-0=column:value,op:>,value:100",
"A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&secondaryAggregation=max&having-0=column:value,op:>,value:100",
},
},
{
Expand Down
71 changes: 50 additions & 21 deletions pkg/query-service/model/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,28 @@ func GetPercentileFromOperator(operator SpaceAggregation) float64 {
}
}

type SecondaryAggregation string

const (
SecondaryAggregationUnspecified SecondaryAggregation = ""
SecondaryAggregationSum SecondaryAggregation = "sum"
SecondaryAggregationAvg SecondaryAggregation = "avg"
SecondaryAggregationMin SecondaryAggregation = "min"
SecondaryAggregationMax SecondaryAggregation = "max"
)

func (s SecondaryAggregation) Validate() error {
switch s {
case SecondaryAggregationSum,
SecondaryAggregationAvg,
SecondaryAggregationMin,
SecondaryAggregationMax:
return nil
default:
return fmt.Errorf("invalid series aggregation: %s", s)
}
}

type FunctionName string

const (
Expand Down Expand Up @@ -784,27 +806,28 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter {
}

type BuilderQuery struct {
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
Expand Down Expand Up @@ -958,6 +981,12 @@ func (b *BuilderQuery) Validate(panelType PanelType) error {
// return fmt.Errorf("group by is not supported for list panel type")
// }

if panelType == PanelTypeValue && len(b.GroupBy) > 0 {
if err := b.SecondaryAggregation.Validate(); err != nil {
return fmt.Errorf("series aggregation is required for value type panel with group by: %w", err)
}
}

for _, groupBy := range b.GroupBy {
if err := groupBy.Validate(); err != nil {
return fmt.Errorf("group by is invalid %w", err)
Expand Down

0 comments on commit 5708079

Please sign in to comment.