Skip to content

Commit

Permalink
feat(metric): model trigger counts api
Browse files Browse the repository at this point in the history
  • Loading branch information
joremysh committed Nov 4, 2024
1 parent 89cce72 commit 34ba1d1
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.4.0-alpha
github.com/knadh/koanf v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929 h1:qDYT1EF7vy+lnmc/A4AV/WorGJvJP4knOu3iatJ4kDs=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104082033-0f013f505929/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg=
Expand Down
33 changes: 33 additions & 0 deletions pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,39 @@ func (h *PublicHandler) ValidateToken(ctx context.Context, req *mgmtPB.ValidateT
return &mgmtPB.ValidateTokenResponse{UserUid: userUID}, nil
}

// GetModelTriggerCount returns the model trigger count of a given
// requester within a timespan. Results are grouped by trigger status.
func (h *PublicHandler) GetModelTriggerCount(ctx context.Context, req *mgmtPB.GetModelTriggerCountRequest) (*mgmtPB.GetModelTriggerCountResponse, error) {
eventName := "GetModelTriggerCount"
ctx, span := tracer.Start(ctx, eventName,
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logUUID, _ := uuid.NewV4()
logger, _ := logger.GetZapLogger(ctx)

ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

resp, err := h.Service.GetModelTriggerCount(ctx, req, ctxUserUID)
if err != nil {
span.SetStatus(1, err.Error())
return nil, fmt.Errorf("fetching credit chart records: %w", err)
}

logger.Info(string(custom_otel.NewLogMessage(
span,
logUUID.String(),
ctxUserUID,
eventName,
)))

return resp, nil
}

func (h *PublicHandler) ListPipelineTriggerRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerRecordsRequest) (*mgmtPB.ListPipelineTriggerRecordsResponse, error) {
eventName := "ListPipelineTriggerRecords"
ctx, span := tracer.Start(ctx, eventName,
Expand Down
77 changes: 77 additions & 0 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type InfluxDB interface {
QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
GetModelTriggerCount(ctx context.Context, p GetModelTriggerCountParams) (*mgmtpb.GetModelTriggerCountResponse, error)
ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

Bucket() string
Expand Down Expand Up @@ -450,6 +451,82 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
return records, nil
}

const qModelTriggerCount = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
|> filter(fn: (r) => r._field == "trigger_time")
|> group(columns: ["requester_uid", "status"])
|> count(column: "_value")
`

// GetModelTriggerCountParams contains the required information to
// query the model trigger count of a namespace.
// TODO jvallesm: this should be defined in the service package for better
// decoupling. At the moment this implies breaking an import cycle with many
// dependencies.
type GetModelTriggerCountParams struct {
RequesterUID uuid.UUID
Start time.Time
Stop time.Time
}

func (i *influxDB) GetModelTriggerCount(
ctx context.Context,
p GetModelTriggerCountParams,
) (*mgmtpb.GetModelTriggerCountResponse, error) {
l, _ := logger.GetZapLogger(ctx)
l = l.With(zap.Reflect("triggerCountParams", p))

query := fmt.Sprintf(
qModelTriggerCount,
i.Bucket(),
p.Start.Format(time.RFC3339Nano),
p.Stop.Format(time.RFC3339Nano),
p.RequesterUID.String(),
)
result, err := i.QueryAPI().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
}

defer result.Close()

// We'll have one record per status.
countRecords := make([]*mgmtpb.TriggerCount, 0, 2)
for result.Next() {
l := l.With(zap.Time("_time", result.Record().Time()))

statusStr := result.Record().ValueByKey("status").(string)
status := mgmtpb.Status(mgmtpb.Status_value[statusStr])
if status == mgmtpb.Status_STATUS_UNSPECIFIED {
l.Error("Missing status on trigger count record.")
}

count, match := result.Record().Value().(int64)
if !match {
l.Error("Missing count on model trigger count record.")
}

countRecords = append(countRecords, &mgmtpb.TriggerCount{
TriggerCount: int32(count),
Status: &status,
})
}

if result.Err() != nil {
return nil, fmt.Errorf("collecting information from model trigger count records: %w", err)
}

if result.Record() == nil {
return nil, nil
}

return &mgmtpb.GetModelTriggerCountResponse{
ModelTriggerCounts: countRecords,
}, nil
}

const qModelTriggerChartRecords = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
Expand Down
26 changes: 26 additions & 0 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,32 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg
return pipelineTriggerChartRecords, nil
}

func (s *service) GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) {
requesterUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID)
if err != nil {
return nil, fmt.Errorf("checking user permissions: %w", err)
}

now := time.Now().UTC()
p := repository.GetModelTriggerCountParams{
RequesterUID: requesterUID,

// Default values
Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
Stop: now,
}

if req.GetStart() != nil {
p.Start = req.GetStart().AsTime()
}

if req.GetStop() != nil {
p.Stop = req.GetStop().AsTime()
}

return s.influxDB.GetModelTriggerCount(ctx, p)
}

func (s *service) ListModelTriggerChartRecords(
ctx context.Context,
req *mgmtpb.ListModelTriggerChartRecordsRequest,
Expand Down
1 change: 1 addition & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Service interface {
ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error)
ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error)
ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error)
GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error)
ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)
Expand Down

0 comments on commit 34ba1d1

Please sign in to comment.