diff --git a/pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go b/pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go new file mode 100644 index 00000000000..a4bfcb1be22 --- /dev/null +++ b/pkg/query-service/app/metrics/v4/helpers/series_agg_helper.go @@ -0,0 +1,30 @@ +package helpers + +import ( + "fmt" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func AddSecondaryAggregation(seriesAggregator v3.SecondaryAggregation, query string) string { + queryImpl := "SELECT %s as aggregated_value, ts" + + " FROM (%s)" + + " GROUP BY ts" + + " ORDER BY ts" + + var op string + switch seriesAggregator { + case v3.SecondaryAggregationAvg: + op = "avg(value)" + query = fmt.Sprintf(queryImpl, op, query) + case v3.SecondaryAggregationSum: + op = "sum(value)" + query = fmt.Sprintf(queryImpl, op, query) + case v3.SecondaryAggregationMin: + op = "min(value)" + query = fmt.Sprintf(queryImpl, op, query) + case v3.SecondaryAggregationMax: + op = "max(value)" + query = fmt.Sprintf(queryImpl, op, query) + } + return query +} diff --git a/pkg/query-service/app/metrics/v4/query_builder.go b/pkg/query-service/app/metrics/v4/query_builder.go index 8725162d67b..6f4d96b2707 100644 --- a/pkg/query-service/app/metrics/v4/query_builder.go +++ b/pkg/query-service/app/metrics/v4/query_builder.go @@ -89,6 +89,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P mq.SpaceAggregation = percentileOperator } + if panelType == v3.PanelTypeValue && len(mq.GroupBy) > 0 { + query = helpers.AddSecondaryAggregation(mq.SecondaryAggregation, query) + } + return query, nil } diff --git a/pkg/query-service/app/metrics/v4/query_builder_test.go b/pkg/query-service/app/metrics/v4/query_builder_test.go index be1f5f65ba2..c35028beb01 100644 --- a/pkg/query-service/app/metrics/v4/query_builder_test.go +++ b/pkg/query-service/app/metrics/v4/query_builder_test.go @@ -614,3 +614,149 @@ func TestPrepareMetricQueryGauge(t *testing.T) { }) } } + +func TestPrepareMetricQueryValueTypePanelWithGroupBY(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test temporality = cumulative, panel = value, series agg = max group by state", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorMin, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyType("Gauge"), + IsColumn: true, + }, + Temporality: v3.Delta, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationAvg, + SecondaryAggregation: v3.SecondaryAggregationMax, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + Operator: v3.FilterOperatorEqual, + Value: "linux", + }, + }, + }, + Expression: "A", + Disabled: false, + StepInterval: 60, + OrderBy: []v3.OrderBy{ + { + ColumnName: "state", + Order: v3.DirectionDesc, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + }, + Legend: "", + ReduceTo: v3.ReduceToOperatorSum, + Having: []v3.Having{ + { + ColumnName: "AVG(system_memory_usage)", + Operator: v3.HavingOperatorGreaterThan, + Value: 5, + }, + }, + }, + expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Delta' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC) GROUP BY ts ORDER BY ts", + }, + { + name: "test temporality = cumulative, panel = value, series agg = max group by state, host_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorMin, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyType("Gauge"), + IsColumn: true, + }, + Temporality: v3.Cumulative, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationAvg, + SecondaryAggregation: v3.SecondaryAggregationMax, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + Operator: v3.FilterOperatorEqual, + Value: "linux", + }, + }, + }, + Expression: "A", + Disabled: false, + StepInterval: 60, + OrderBy: []v3.OrderBy{ + { + ColumnName: "state", + Order: v3.DirectionDesc, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + }, + Legend: "", + ReduceTo: v3.ReduceToOperatorSum, + Having: []v3.Having{ + { + ColumnName: "AVG(system_memory_usage)", + Operator: v3.HavingOperatorGreaterThan, + Value: 5, + }, + }, + }, + expectedQueryContains: "SELECT max(value) as aggregated_value, ts FROM (SELECT state, host_name, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, anyLast(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Cumulative' AND unix_milli >= 1735891200000 AND unix_milli < 1735894800000 AND JSONExtractString(labels, 'os_type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1735891800000 AND unix_milli < 1735894800000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, host_name, ts ORDER BY state desc, host_name ASC, ts ASC) GROUP BY ts ORDER BY ts", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1735891811000 - Friday, 3 January 2025 13:40:11 GMT+05:30 + // 1735894811000 - Friday, 3 January 2025 14:30:11 GMT+05:30 + query, err := PrepareMetricQuery(1735891811000, 1735894811000, v3.QueryTypeBuilder, v3.PanelTypeValue, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 9b5be469af7..cfb86a99446 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -425,6 +425,9 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri for idx, groupBy := range query.GroupBy { parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey())) } + if params.CompositeQuery.PanelType == v3.PanelTypeValue { + parts = append(parts, fmt.Sprintf("secondaryAggregation=%s", query.SecondaryAggregation)) + } } if len(query.Having) > 0 { diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index 1ee73cd0959..8e8414d9c08 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -1300,13 +1300,14 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) { QueryType: v3.QueryTypeBuilder, BuilderQueries: map[string]*v3.BuilderQuery{ "A": { - QueryName: "A", - StepInterval: 60, - DataSource: v3.DataSourceMetrics, - AggregateOperator: v3.AggregateOperatorSumRate, - Expression: "A", - AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"}, - Temporality: v3.Delta, + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + SecondaryAggregation: v3.SecondaryAggregationMax, + Expression: "A", + AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"}, + Temporality: v3.Delta, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -1333,7 +1334,7 @@ func TestGenerateCacheKeysMetricsBuilder(t *testing.T) { }, }, expectedCacheKeys: map[string]string{ - "A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&having-0=column:value,op:>,value:100", + "A": "source=metrics&step=60&aggregate=sum_rate&timeAggregation=&spaceAggregation=&aggregateAttribute=signoz_latency_bucket---false&filter-0=key:service_name---false,op:=,value:A&groupBy-0=service_name---false&groupBy-1=le---false&secondaryAggregation=max&having-0=column:value,op:>,value:100", }, }, { diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 158718a12b4..49fd1b1d35e 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -712,6 +712,28 @@ func GetPercentileFromOperator(operator SpaceAggregation) float64 { } } +type SecondaryAggregation string + +const ( + SecondaryAggregationUnspecified SecondaryAggregation = "" + SecondaryAggregationSum SecondaryAggregation = "sum" + SecondaryAggregationAvg SecondaryAggregation = "avg" + SecondaryAggregationMin SecondaryAggregation = "min" + SecondaryAggregationMax SecondaryAggregation = "max" +) + +func (s SecondaryAggregation) Validate() error { + switch s { + case SecondaryAggregationSum, + SecondaryAggregationAvg, + SecondaryAggregationMin, + SecondaryAggregationMax: + return nil + default: + return fmt.Errorf("invalid series aggregation: %s", s) + } +} + type FunctionName string const ( @@ -784,27 +806,28 @@ func (m *MetricValueFilter) Clone() *MetricValueFilter { } type BuilderQuery struct { - QueryName string `json:"queryName"` - StepInterval int64 `json:"stepInterval"` - DataSource DataSource `json:"dataSource"` - AggregateOperator AggregateOperator `json:"aggregateOperator"` - AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` - Temporality Temporality `json:"temporality,omitempty"` - Filters *FilterSet `json:"filters,omitempty"` - GroupBy []AttributeKey `json:"groupBy,omitempty"` - Expression string `json:"expression"` - Disabled bool `json:"disabled"` - Having []Having `json:"having,omitempty"` - Legend string `json:"legend,omitempty"` - Limit uint64 `json:"limit"` - Offset uint64 `json:"offset"` - PageSize uint64 `json:"pageSize"` - OrderBy []OrderBy `json:"orderBy,omitempty"` - ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` - SelectColumns []AttributeKey `json:"selectColumns,omitempty"` - TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` - SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` - Functions []Function `json:"functions,omitempty"` + QueryName string `json:"queryName"` + StepInterval int64 `json:"stepInterval"` + DataSource DataSource `json:"dataSource"` + AggregateOperator AggregateOperator `json:"aggregateOperator"` + AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` + Temporality Temporality `json:"temporality,omitempty"` + Filters *FilterSet `json:"filters,omitempty"` + GroupBy []AttributeKey `json:"groupBy,omitempty"` + Expression string `json:"expression"` + Disabled bool `json:"disabled"` + Having []Having `json:"having,omitempty"` + Legend string `json:"legend,omitempty"` + Limit uint64 `json:"limit"` + Offset uint64 `json:"offset"` + PageSize uint64 `json:"pageSize"` + OrderBy []OrderBy `json:"orderBy,omitempty"` + ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` + SelectColumns []AttributeKey `json:"selectColumns,omitempty"` + TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` + SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` + SecondaryAggregation SecondaryAggregation `json:"seriesAggregation,omitempty"` + Functions []Function `json:"functions,omitempty"` ShiftBy int64 IsAnomaly bool QueriesUsedInFormula []string @@ -958,6 +981,12 @@ func (b *BuilderQuery) Validate(panelType PanelType) error { // return fmt.Errorf("group by is not supported for list panel type") // } + if panelType == PanelTypeValue && len(b.GroupBy) > 0 { + if err := b.SecondaryAggregation.Validate(); err != nil { + return fmt.Errorf("series aggregation is required for value type panel with group by: %w", err) + } + } + for _, groupBy := range b.GroupBy { if err := groupBy.Validate(); err != nil { return fmt.Errorf("group by is invalid %w", err)