Skip to content

Commit

Permalink
feat(metric): model trigger table record api
Browse files Browse the repository at this point in the history
  • Loading branch information
joremysh committed Nov 4, 2024
1 parent 89cce72 commit 8c0dda7
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.4.0-alpha
github.com/knadh/koanf v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00 h1:65aiWEWp8ayWm64aCfa7nOoNSjsiX6zXs1VBadJ/JhY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg=
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
PipelineUID string = "pipeline_uid"
PipelineReleaseID string = "pipeline_release_id"
PipelineReleaseUID string = "pipeline_release_uid"
ModelID string = "model_id"
ModelUID string = "model_uid"
TriggerMode string = "trigger_mode"
Status string = "status"
)
Expand Down
64 changes: 64 additions & 0 deletions pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,70 @@ func (h *PublicHandler) ListPipelineTriggerTableRecords(ctx context.Context, req
return &resp, nil
}

func (h *PublicHandler) ListModelTriggerTableRecords(ctx context.Context, req *mgmtPB.ListModelTriggerTableRecordsRequest) (*mgmtPB.ListModelTriggerTableRecordsResponse, error) {

eventName := "ListModelTriggerTableRecords"
ctx, span := tracer.Start(ctx, eventName,
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logUUID, _ := uuid.NewV4()

logger, _ := logger.GetZapLogger(ctx)

ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}
pbUser, err := h.Service.GetUserByUIDAdmin(ctx, ctxUserUID)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{
filtering.DeclareStandardFunctions(),
filtering.DeclareIdent(constant.Start, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.Stop, filtering.TypeTimestamp),
filtering.DeclareIdent(strcase.ToLowerCamel(constant.OwnerName), filtering.TypeString),
filtering.DeclareIdent(strcase.ToLowerCamel(constant.ModelID), filtering.TypeString),
filtering.DeclareIdent(strcase.ToLowerCamel(constant.ModelUID), filtering.TypeString),
}...)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

filter, err := filtering.ParseFilter(req, declarations)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

modelTriggerTableRecords, totalSize, nextPageToken, err := h.Service.ListModelTriggerTableRecords(ctx, pbUser, int64(req.GetPageSize()), req.GetPageToken(), filter)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

resp := mgmtPB.ListModelTriggerTableRecordsResponse{
ModelTriggerTableRecords: modelTriggerTableRecords,
NextPageToken: nextPageToken,
TotalSize: int32(totalSize),
}

logger.Info(string(custom_otel.NewLogMessage(
span,
logUUID.String(),
uuid.FromStringOrNil(*pbUser.Uid),
eventName,
custom_otel.SetEventResult(fmt.Sprintf("Total records retrieved: %v", totalSize)),
)))

return &resp, nil
}

// ListPipelineTriggerChartRecords returns a timeline of a requester's pipeline
// trigger count.
func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) {
Expand Down
187 changes: 187 additions & 0 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var defaultAggregationWindow = time.Hour.Nanoseconds()
type InfluxDB interface {
QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
QueryModelTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.ModelTriggerTableRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

Expand Down Expand Up @@ -121,6 +122,192 @@ func AggregationWindowOffset(t time.Time) time.Duration {
return t.Sub(startOfDay)
}

func (i *influxDB) QueryModelTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string,
pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.ModelTriggerTableRecord,
totalSize int64, nextPageToken string, err error) {

logger, _ := logger.GetZapLogger(ctx)

if pageSize == 0 {
pageSize = DefaultPageSize
} else if pageSize > MaxPageSize {
pageSize = MaxPageSize
}

start := time.Time{}.Format(time.RFC3339Nano)
stop := time.Now().Format(time.RFC3339Nano)
mostRecetTimeFilter := time.Now().Format(time.RFC3339Nano)

// TODO: validate owner uid from token
if pageToken != "" {
mostRecetTime, _, err := paginate.DecodeToken(pageToken)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid page token: %s", err.Error())
}
mostRecetTime = mostRecetTime.Add(time.Duration(-1))
mostRecetTimeFilter = mostRecetTime.Format(time.RFC3339Nano)
}

// TODO: design better filter expression to flux transpiler
expr, err := i.transpileFilter(filter)
if err != nil {
return nil, 0, "", status.Error(codes.Internal, err.Error())
}

if expr != "" {
exprs := strings.Split(expr, "&&")
for i, expr := range exprs {
if strings.HasPrefix(expr, constant.Start) {
start = strings.Split(expr, "@")[1]
exprs[i] = ""
}
if strings.HasPrefix(expr, constant.Stop) {
stop = strings.Split(expr, "@")[1]
exprs[i] = ""
}
}
expr = strings.Join(exprs, "")
}

baseQuery := fmt.Sprintf(
`base =
from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "model.trigger.v1")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
%v
%v
triggerRank =
base
|> drop(
columns: [
"owner_uid",
"trigger_mode",
"compute_time_duration",
"model_trigger_id",
"status",
],
)
|> group(columns: ["model_uid"])
|> map(fn: (r) => ({r with trigger_time: time(v: r.trigger_time)}))
|> sort(columns: ["trigger_time"], desc: true)
|> first(column: "trigger_time")
|> rename(columns: {trigger_time: "most_recent_trigger_time"})
triggerCount =
base
|> drop(
columns: ["owner_uid", "trigger_mode", "compute_time_duration", "model_trigger_id"],
)
|> group(columns: ["model_uid", "status"])
|> count(column: "trigger_time")
|> rename(columns: {trigger_time: "trigger_count"})
|> group(columns: ["model_uid"])
triggerTable =
join(tables: {t1: triggerRank, t2: triggerCount}, on: ["model_uid"])
|> group()
|> pivot(
rowKey: ["model_uid", "most_recent_trigger_time"],
columnKey: ["status"],
valueColumn: "trigger_count",
)
|> filter(
fn: (r) => r["most_recent_trigger_time"] < time(v: %v)
)
nameMap =
base
|> keep(columns: ["trigger_time", "model_id", "model_uid"])
|> group(columns: ["model_uid"])
|> top(columns: ["trigger_time"], n: 1)
|> drop(columns: ["trigger_time"])
|> group()
join(tables: {t1: triggerTable, t2: nameMap}, on: ["model_uid"])`,
i.bucket,
start,
stop,
ownerQueryString,
expr,
mostRecetTimeFilter,
)

query := fmt.Sprintf(
`%v
|> group()
|> sort(columns: ["most_recent_trigger_time"], desc: true)
|> limit(n: %v)`,
baseQuery,
pageSize,
)

totalQuery := fmt.Sprintf(
`%v
|> group()
|> count(column: "model_uid")`,
baseQuery,
)

var lastTimestamp time.Time

result, err := i.api.Query(ctx, query)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
} else {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String()))
}

tableRecord := &mgmtpb.ModelTriggerTableRecord{}

if v, match := result.Record().ValueByKey(constant.ModelID).(string); match {
tableRecord.ModelId = v
}
if v, match := result.Record().ValueByKey(constant.ModelUID).(string); match {
tableRecord.ModelUid = v
}
if v, match := result.Record().ValueByKey(mgmtpb.Status_STATUS_COMPLETED.String()).(int64); match {
tableRecord.TriggerCountCompleted = int32(v)
}
if v, match := result.Record().ValueByKey(mgmtpb.Status_STATUS_ERRORED.String()).(int64); match {
tableRecord.TriggerCountErrored = int32(v)
}

records = append(records, tableRecord)
}

// Check for an error
if result.Err() != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
}
if result.Record() == nil {
return nil, 0, "", nil
}

if v, match := result.Record().ValueByKey("most_recent_trigger_time").(time.Time); match {
lastTimestamp = v
}
}

var total int64
totalQueryResult, err := i.api.Query(ctx, totalQuery)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid total query: %s", err.Error())
} else {
if totalQueryResult.Next() {
total = totalQueryResult.Record().ValueByKey(constant.ModelUID).(int64)
}
}

if int64(len(records)) < total {
pageToken = paginate.EncodeToken(lastTimestamp, owner)
} else {
pageToken = ""
}

return records, int64(len(records)), pageToken, nil
}

func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) {

logger, _ := logger.GetZapLogger(ctx)
Expand Down
15 changes: 15 additions & 0 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ func (s *service) ListPipelineTriggerTableRecords(ctx context.Context, owner *mg
return pipelineTriggerTableRecords, ps, pt, nil
}

func (s *service) ListModelTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.ModelTriggerTableRecord, int64, string, error) {

ownerUID, _, _, ownerQueryString, filter, err := s.checkPipelineOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtpb.ModelTriggerTableRecord{}, 0, "", err
}

modelTriggerTableRecords, ps, pt, err := s.influxDB.QueryModelTriggerTableRecords(ctx, *ownerUID, ownerQueryString, pageSize, pageToken, filter)
if err != nil {
return nil, 0, "", err
}

return modelTriggerTableRecords, ps, pt, nil
}

func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) {

ownerUID, ownerID, ownerType, ownerQueryString, filter, err := s.checkPipelineOwnership(ctx, filter, owner)
Expand Down
1 change: 1 addition & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Service interface {
ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error)
ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error)
ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error)
ListModelTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.ModelTriggerTableRecord, int64, string, error)
ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)
Expand Down

0 comments on commit 8c0dda7

Please sign in to comment.