Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric): model trigger chart record api #245

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ tmp
bin

# local
.DS_Store
.DS_Store
.idea
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.4.0-alpha
github.com/knadh/koanf v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e h1:qUCFv38Xl9Gn9MMAYoHR97i7iQ/L3b8zvj/TX/viZlU=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg=
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
PipelineUID string = "pipeline_uid"
PipelineReleaseID string = "pipeline_release_id"
PipelineReleaseUID string = "pipeline_release_uid"
ModelID string = "model_id"
TriggerMode string = "trigger_mode"
Status string = "status"
)
Expand Down
36 changes: 35 additions & 1 deletion pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/instill-ai/mgmt-backend/pkg/logger"
"github.com/instill-ai/mgmt-backend/pkg/service"
"github.com/instill-ai/mgmt-backend/pkg/usage"
"github.com/instill-ai/x/checkfield"

custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel"
healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1beta"
mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
checkfield "github.com/instill-ai/x/checkfield"
)

// TODO: Validate mask based on the field behavior. Currently, the fields are hard-coded.
Expand Down Expand Up @@ -1049,6 +1049,40 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req
return &resp, nil
}

// ListModelTriggerChartRecords returns a timeline of model trigger counts for a given requester. The
// response will contain one set of records (datapoints), representing the amount of triggers in a time bucket.
func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *mgmtPB.ListModelTriggerChartRecordsRequest) (*mgmtPB.ListModelTriggerChartRecordsResponse, error) {

eventName := "ListModelTriggerChartRecords"
ctx, span := tracer.Start(ctx, eventName,
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logUUID, _ := uuid.NewV4()
logger, _ := logger.GetZapLogger(ctx)

ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

resp, err := h.Service.ListModelTriggerChartRecords(ctx, req, ctxUserUID)
if err != nil {
span.SetStatus(1, err.Error())
return nil, fmt.Errorf("fetching credit chart records: %w", err)
}

logger.Info(string(custom_otel.NewLogMessage(
span,
logUUID.String(),
ctxUserUID,
eventName,
)))

return resp, nil
}

func (h *PublicHandler) ListUserMemberships(ctx context.Context, req *mgmtPB.ListUserMembershipsRequest) (*mgmtPB.ListUserMembershipsResponse, error) {

eventName := "ListUserMemberships"
Expand Down
177 changes: 176 additions & 1 deletion pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/gofrs/uuid"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/log"
"go.einride.tech/aip/filtering"
Expand Down Expand Up @@ -33,6 +34,7 @@ type InfluxDB interface {
QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

Bucket() string
QueryAPI() api.QueryAPI
Expand Down Expand Up @@ -405,7 +407,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
}

if result.Record().Table() != currentTablePosition {
chartRecord = &mgmtpb.PipelineTriggerChartRecord{}
chartRecord = &mgmtpb.PipelineTriggerChartRecord{} // only insert a new object when iterated to a new model

if v, match := result.Record().ValueByKey(constant.PipelineID).(string); match {
chartRecord.PipelineId = v
Expand Down Expand Up @@ -447,6 +449,179 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
return records, nil
}

// todo: merge changes for new pipeline dashboard endpoints and refactor this part
// const qModelTriggerChartRecords = `
// from(bucket: "%s")
// |> range(start: %s, stop: %s)
// |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
// |> filter(fn: (r) => r._field == "trigger_time")
// |> group(columns:["requester_uid"])
// |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s)
// `

// ListModelTriggerChartRecordsParams contains the required information to
// query the model triggers of a namespace.
type ListModelTriggerChartRecordsParams struct {
NamespaceID string
RequesterUID uuid.UUID
AggregationWindow time.Duration
Start time.Time
Stop time.Time
}

func (i *influxDB) ListModelTriggerChartRecords(
ctx context.Context,
p ListModelTriggerChartRecordsParams,
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
// todo: merge changes for new pipeline dashboard endpoints and refactor this part
joremysh marked this conversation as resolved.
Show resolved Hide resolved
// l, _ := logger.GetZapLogger(ctx)
// l = l.With(zap.Reflect("triggerChartParams", p))

// query := fmt.Sprintf(
// qModelTriggerChartRecords,
// i.Bucket(),
// p.Start.Format(time.RFC3339Nano),
// p.Stop.Format(time.RFC3339Nano),
// p.RequesterUID.String(),
// p.AggregationWindow,
// AggregationWindowOffset(p.Start).String(),
// )
// result, err := i.QueryAPI().Query(ctx, query)
// if err != nil {
// return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
// }
//
// defer result.Close()
//
// record := &mgmtpb.ModelTriggerChartRecord{
// RequesterId: p.NamespaceID,
// TimeBuckets: []*timestamppb.Timestamp{},
// TriggerCounts: []int32{},
// }
//
// // Until filtering and grouping are implemented, we'll only have one record
// // (total triggers by requester).
// records := []*mgmtpb.ModelTriggerChartRecord{record}
//
// for result.Next() {
// t := result.Record().Time()
// record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t))
//
// v, match := result.Record().Value().(int64)
// if !match {
// l.With(zap.Time("_time", result.Record().Time())).
// Error("Missing count on model trigger chart record.")
// }
//
// record.TriggerCounts = append(record.TriggerCounts, int32(v))
// }
//
// if result.Err() != nil {
// return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err)
// }
//
// if result.Record() == nil {
// return nil, nil
// }
//
// return &mgmtpb.ListModelTriggerChartRecordsResponse{
// ModelTriggerChartRecords: records,
// }, nil
logger, _ := logger.GetZapLogger(ctx)

query := fmt.Sprintf(
`base =
from(bucket: "%s")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "model.trigger.v1" and r.requester_uid == "%s")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
bucketBase =
base
|> group(columns: ["model_uid"])
|> sort(columns: ["trigger_time"])
bucketTrigger =
bucketBase
|> aggregateWindow(
every: %v,
column: "trigger_time",
fn: count,
createEmpty: false,
)
bucketDuration =
bucketBase
|> aggregateWindow(
every: %v,
fn: sum,
column: "compute_time_duration",
createEmpty: false,
)
bucket =
join(
tables: {t1: bucketTrigger, t2: bucketDuration},
on: ["_start", "_stop", "_time", "model_uid"],
)
nameMap =
base
|> keep(columns: ["trigger_time", "model_id", "model_uid"])
|> group(columns: ["model_uid"])
|> top(columns: ["trigger_time"], n: 1)
|> drop(columns: ["trigger_time"])
join(tables: {t1: bucket, t2: nameMap}, on: ["model_uid"])`,
i.bucket,
p.Start.Format(time.RFC3339Nano),
p.Stop.Format(time.RFC3339Nano),
p.RequesterUID.String(),
p.AggregationWindow,
p.AggregationWindow,
)

result, err := i.api.Query(ctx, query)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
}

var currentTablePosition = -1
var chartRecord *mgmtpb.ModelTriggerChartRecord
var records []*mgmtpb.ModelTriggerChartRecord
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String()))
}

if result.Record().Table() != currentTablePosition {
chartRecord = &mgmtpb.ModelTriggerChartRecord{}

if v, match := result.Record().ValueByKey(constant.ModelID).(string); match {
chartRecord.ModelId = &v
}
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
chartRecord.TriggerCounts = []int32{}
records = append(records, chartRecord)
currentTablePosition = result.Record().Table()
}

if v, match := result.Record().ValueByKey("_time").(time.Time); match {
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(v))
}
if v, match := result.Record().ValueByKey(constant.TriggerTime).(int64); match {
chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, int32(v))
}
}
// Check for an error
if result.Err() != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
}
if result.Record() == nil {
return nil, nil
}

return &mgmtpb.ListModelTriggerChartRecordsResponse{
ModelTriggerChartRecords: records,
}, nil
}

// TranspileFilter transpiles a parsed AIP filter expression to Flux query expression
func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) {
return (&Transpiler{
Expand Down
44 changes: 43 additions & 1 deletion pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/gofrs/uuid"
"go.einride.tech/aip/filtering"
"gorm.io/gorm"

"github.com/gofrs/uuid"
"github.com/instill-ai/mgmt-backend/internal/resource"
"github.com/instill-ai/mgmt-backend/pkg/acl"
"github.com/instill-ai/mgmt-backend/pkg/constant"
Expand Down Expand Up @@ -163,6 +164,47 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg
return pipelineTriggerChartRecords, nil
}

func (s *service) ListModelTriggerChartRecords(
ctx context.Context,
req *mgmtpb.ListModelTriggerChartRecordsRequest,
ctxUserUID uuid.UUID,
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID)
if err != nil {
return nil, fmt.Errorf("checking user permissions: %w", err)
}

now := time.Now().UTC()
p := repository.ListModelTriggerChartRecordsParams{
NamespaceID: req.GetRequesterId(),
RequesterUID: nsUID,

// Default values
AggregationWindow: 1 * time.Hour,
Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
Stop: now,
}

if req.GetAggregationWindow() != "" {
window, err := time.ParseDuration(req.GetAggregationWindow())
if err != nil {
return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err)
}

p.AggregationWindow = window
}

if req.GetStart() != nil {
p.Start = req.GetStart().AsTime()
}

if req.GetStop() != nil {
p.Stop = req.GetStop().AsTime()
}

return s.influxDB.ListModelTriggerChartRecords(ctx, p)
}

// GrantedNamespaceUID returns the UID of a namespace, provided the
// authenticated user has access to it.
func (s *service) GrantedNamespaceUID(ctx context.Context, namespaceID string, authenticatedUserUID uuid.UUID) (uuid.UUID, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Service interface {
ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error)
ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error)
ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error)
ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)
DBUsers2PBUsers(ctx context.Context, dbUsers []*datamodel.Owner) ([]*mgmtpb.User, error)
Expand Down
Loading