diff --git a/go.mod b/go.mod index 747e884..acfcacf 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.4.0-alpha github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 3a3e33b..6cfc871 100644 --- a/go.sum +++ b/go.sum @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929 h1:qDYT1EF7vy+lnmc/A4AV/WorGJvJP4knOu3iatJ4kDs= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg= diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index 9d762c3..3c2420a 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -853,6 +853,39 @@ func (h *PublicHandler) ValidateToken(ctx context.Context, req *mgmtPB.ValidateT return &mgmtPB.ValidateTokenResponse{UserUid: userUID}, nil } +// GetModelTriggerCount returns the model trigger count of a given +// requester within a timespan. Results are grouped by trigger status. +func (h *PublicHandler) GetModelTriggerCount(ctx context.Context, req *mgmtPB.GetModelTriggerCountRequest) (*mgmtPB.GetModelTriggerCountResponse, error) { + eventName := "GetModelTriggerCount" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp, err := h.Service.GetModelTriggerCount(ctx, req, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, fmt.Errorf("fetching credit chart records: %w", err) + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + + return resp, nil +} + func (h *PublicHandler) ListPipelineTriggerRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerRecordsRequest) (*mgmtPB.ListPipelineTriggerRecordsResponse, error) { eventName := "ListPipelineTriggerRecords" ctx, span := tracer.Start(ctx, eventName, diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 313c2e2..4a7135a 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -35,6 +35,7 @@ type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) + GetModelTriggerCount(ctx context.Context, p GetModelTriggerCountParams) (*mgmtpb.GetModelTriggerCountResponse, error) ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) Bucket() string @@ -450,6 +451,82 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s return records, nil } +const qModelTriggerCount = ` +from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") + |> filter(fn: (r) => r._field == "trigger_time") + |> group(columns: ["requester_uid", "status"]) + |> count(column: "_value") +` + +// GetModelTriggerCountParams contains the required information to +// query the model trigger count of a namespace. +// TODO jvallesm: this should be defined in the service package for better +// decoupling. At the moment this implies breaking an import cycle with many +// dependencies. +type GetModelTriggerCountParams struct { + RequesterUID uuid.UUID + Start time.Time + Stop time.Time +} + +func (i *influxDB) GetModelTriggerCount( + ctx context.Context, + p GetModelTriggerCountParams, +) (*mgmtpb.GetModelTriggerCountResponse, error) { + l, _ := logger.GetZapLogger(ctx) + l = l.With(zap.Reflect("triggerCountParams", p)) + + query := fmt.Sprintf( + qModelTriggerCount, + i.Bucket(), + p.Start.Format(time.RFC3339Nano), + p.Stop.Format(time.RFC3339Nano), + p.RequesterUID.String(), + ) + result, err := i.QueryAPI().Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + } + + defer result.Close() + + // We'll have one record per status. + countRecords := make([]*mgmtpb.TriggerCount, 0, 2) + for result.Next() { + l := l.With(zap.Time("_time", result.Record().Time())) + + statusStr := result.Record().ValueByKey("status").(string) + status := mgmtpb.Status(mgmtpb.Status_value[statusStr]) + if status == mgmtpb.Status_STATUS_UNSPECIFIED { + l.Error("Missing status on trigger count record.") + } + + count, match := result.Record().Value().(int64) + if !match { + l.Error("Missing count on model trigger count record.") + } + + countRecords = append(countRecords, &mgmtpb.TriggerCount{ + TriggerCount: int32(count), + Status: &status, + }) + } + + if result.Err() != nil { + return nil, fmt.Errorf("collecting information from model trigger count records: %w", err) + } + + if result.Record() == nil { + return nil, nil + } + + return &mgmtpb.GetModelTriggerCountResponse{ + ModelTriggerCounts: countRecords, + }, nil +} + const qModelTriggerChartRecords = ` from(bucket: "%s") |> range(start: %s, stop: %s) diff --git a/pkg/service/metric.go b/pkg/service/metric.go index 7eae95b..ffc76bc 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -164,6 +164,32 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg return pipelineTriggerChartRecords, nil } +func (s *service) GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) { + requesterUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.GetModelTriggerCountParams{ + RequesterUID: requesterUID, + + // Default values + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.GetModelTriggerCount(ctx, p) +} + func (s *service) ListModelTriggerChartRecords( ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, diff --git a/pkg/service/service.go b/pkg/service/service.go index 380cbfd..778c1a6 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -73,6 +73,7 @@ type Service interface { ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) + GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)