diff --git a/.gitignore b/.gitignore index c4aae29..86c93ff 100644 --- a/.gitignore +++ b/.gitignore @@ -110,4 +110,5 @@ tmp bin # local -.DS_Store \ No newline at end of file +.DS_Store +.idea diff --git a/go.mod b/go.mod index 0e08564..747e884 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.4.0-alpha github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 144769a..3a3e33b 100644 --- a/go.sum +++ b/go.sum @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e h1:qUCFv38Xl9Gn9MMAYoHR97i7iQ/L3b8zvj/TX/viZlU= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg= diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index bbfdf14..9d762c3 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -23,11 +23,11 @@ import ( "github.com/instill-ai/mgmt-backend/pkg/logger" "github.com/instill-ai/mgmt-backend/pkg/service" "github.com/instill-ai/mgmt-backend/pkg/usage" + "github.com/instill-ai/x/checkfield" custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel" healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1beta" mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta" - checkfield "github.com/instill-ai/x/checkfield" ) // TODO: Validate mask based on the field behavior. Currently, the fields are hard-coded. @@ -1049,6 +1049,40 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req return &resp, nil } +// ListModelTriggerChartRecords returns a timeline of model trigger counts for a given requester. The +// response will contain one set of records (datapoints), representing the amount of triggers in a time bucket. +func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *mgmtPB.ListModelTriggerChartRecordsRequest) (*mgmtPB.ListModelTriggerChartRecordsResponse, error) { + + eventName := "ListModelTriggerChartRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp, err := h.Service.ListModelTriggerChartRecords(ctx, req, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + + return resp, nil +} + func (h *PublicHandler) ListUserMemberships(ctx context.Context, req *mgmtPB.ListUserMembershipsRequest) (*mgmtPB.ListUserMembershipsResponse, error) { eventName := "ListUserMemberships" diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 73816d0..313c2e2 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/gofrs/uuid" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/log" "go.einride.tech/aip/filtering" @@ -22,6 +23,7 @@ import ( "github.com/instill-ai/mgmt-backend/pkg/logger" "github.com/instill-ai/x/paginate" + errdomain "github.com/instill-ai/mgmt-backend/pkg/errors" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" ) @@ -33,6 +35,7 @@ type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) + ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) Bucket() string QueryAPI() api.QueryAPI @@ -404,7 +407,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) } - if result.Record().Table() != currentTablePosition { + if result.Record().Table() != currentTablePosition { // only insert a new object when iterated to a new pipeline chartRecord = &mgmtpb.PipelineTriggerChartRecord{} if v, match := result.Record().ValueByKey(constant.PipelineID).(string); match { @@ -447,6 +450,84 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s return records, nil } +const qModelTriggerChartRecords = ` +from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") + |> filter(fn: (r) => r._field == "trigger_time") + |> group(columns:["requester_uid"]) + |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s) +` + +// ListModelTriggerChartRecordsParams contains the required information to +// query the model triggers of a namespace. +type ListModelTriggerChartRecordsParams struct { + RequesterID string + RequesterUID uuid.UUID + AggregationWindow time.Duration + Start time.Time + Stop time.Time +} + +func (i *influxDB) ListModelTriggerChartRecords( + ctx context.Context, + p ListModelTriggerChartRecordsParams, +) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { + l, _ := logger.GetZapLogger(ctx) + l = l.With(zap.Reflect("triggerChartParams", p)) + + query := fmt.Sprintf( + qModelTriggerChartRecords, + i.Bucket(), + p.Start.Format(time.RFC3339Nano), + p.Stop.Format(time.RFC3339Nano), + p.RequesterUID.String(), + p.AggregationWindow, + AggregationWindowOffset(p.Start).String(), + ) + result, err := i.QueryAPI().Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + } + + defer result.Close() + + record := &mgmtpb.ModelTriggerChartRecord{ + RequesterId: p.RequesterID, + TimeBuckets: []*timestamppb.Timestamp{}, + TriggerCounts: []int32{}, + } + + // Until filtering and grouping are implemented, we'll only have one record + // (total triggers by requester). + records := []*mgmtpb.ModelTriggerChartRecord{record} + + for result.Next() { + t := result.Record().Time() + record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t)) + + v, match := result.Record().Value().(int64) + if !match { + l.With(zap.Time("_time", result.Record().Time())). + Error("Missing count on model trigger chart record.") + } + + record.TriggerCounts = append(record.TriggerCounts, int32(v)) + } + + if result.Err() != nil { + return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err) + } + + if result.Record() == nil { + return nil, nil + } + + return &mgmtpb.ListModelTriggerChartRecordsResponse{ + ModelTriggerChartRecords: records, + }, nil +} + // TranspileFilter transpiles a parsed AIP filter expression to Flux query expression func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) { return (&Transpiler{ diff --git a/pkg/service/metric.go b/pkg/service/metric.go index f3b2986..7eae95b 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -5,11 +5,12 @@ import ( "errors" "fmt" "strings" + "time" + "github.com/gofrs/uuid" "go.einride.tech/aip/filtering" "gorm.io/gorm" - "github.com/gofrs/uuid" "github.com/instill-ai/mgmt-backend/internal/resource" "github.com/instill-ai/mgmt-backend/pkg/acl" "github.com/instill-ai/mgmt-backend/pkg/constant" @@ -163,6 +164,47 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg return pipelineTriggerChartRecords, nil } +func (s *service) ListModelTriggerChartRecords( + ctx context.Context, + req *mgmtpb.ListModelTriggerChartRecordsRequest, + ctxUserUID uuid.UUID, +) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { + nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.ListModelTriggerChartRecordsParams{ + RequesterID: req.GetRequesterId(), + RequesterUID: nsUID, + + // Default values + AggregationWindow: 1 * time.Hour, + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetAggregationWindow() != "" { + window, err := time.ParseDuration(req.GetAggregationWindow()) + if err != nil { + return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err) + } + + p.AggregationWindow = window + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.ListModelTriggerChartRecords(ctx, p) +} + // GrantedNamespaceUID returns the UID of a namespace, provided the // authenticated user has access to it. func (s *service) GrantedNamespaceUID(ctx context.Context, namespaceID string, authenticatedUserUID uuid.UUID) (uuid.UUID, error) { diff --git a/pkg/service/service.go b/pkg/service/service.go index da3c893..380cbfd 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -73,6 +73,7 @@ type Service interface { ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) + ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error) DBUsers2PBUsers(ctx context.Context, dbUsers []*datamodel.Owner) ([]*mgmtpb.User, error)